This is an automated email from the ASF dual-hosted git repository.
davidrad pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 56af6bf3 [FLINK-33634] Add Conditions to Flink CRD's Status field
(#1064)
56af6bf3 is described below
commit 56af6bf3a94494923497e709faf29fb3749cd45f
Author: David Radley <[email protected]>
AuthorDate: Mon Mar 2 15:17:23 2026 +0000
[FLINK-33634] Add Conditions to Flink CRD's Status field (#1064)
Added conditions to FlinkDeployment, FlinkSessionJob, and FlinkStateSnapshot
CRD status fields to provide standardised Kubernetes-style status reporting.
Signed-off-by: [email protected] <[email protected]>
Co-authored-by: Lajith <[email protected]>
---
.gitignore | 4 +-
docs/content/docs/custom-resource/reference.md | 3 +
.../flink/kubernetes/operator/api}/Mode.java | 5 +-
.../operator/api/status/FlinkDeploymentStatus.java | 45 +++
.../api/status/JobManagerDeploymentStatus.java | 28 +-
.../operator/api/utils/ConditionsUtils.java | 123 ++++++++
.../FlinkDeploymentStatusConditionsTest.java | 320 +++++++++++++++++++++
.../operator/api/utils/BaseTestUtils.java | 43 +++
.../controller/FlinkDeploymentController.java | 1 +
.../observer/deployment/ApplicationObserver.java | 2 +-
.../deployment/FlinkDeploymentObserverFactory.java | 2 +-
.../observer/deployment/SessionObserver.java | 2 +-
.../reconciler/deployment/ReconcilerFactory.java | 2 +-
.../operator/service/StandaloneFlinkService.java | 2 +-
.../controller/FlinkDeploymentControllerTest.java | 25 +-
.../service/StandaloneFlinkServiceTest.java | 2 +-
.../crds/flinkdeployments.flink.apache.org-v1.yml | 17 ++
17 files changed, 607 insertions(+), 19 deletions(-)
diff --git a/.gitignore b/.gitignore
index 0fd15a8f..527bce74 100644
--- a/.gitignore
+++ b/.gitignore
@@ -40,4 +40,6 @@ buildNumber.properties
.kube
# VSCode settings
-.vscode/
\ No newline at end of file
+.vscode/
+.metals
+.bloop
diff --git a/docs/content/docs/custom-resource/reference.md
b/docs/content/docs/custom-resource/reference.md
index 35f6e9be..76b303ce 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -397,6 +397,7 @@ This serves as a full reference for FlinkDeployment and
FlinkSessionJob custom r
| jobManagerDeploymentStatus |
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus |
Last observed status of the JobManager deployment. |
| reconciliationStatus |
org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentReconciliationStatus
| Status of the last reconcile operation. |
| taskManager |
org.apache.flink.kubernetes.operator.api.status.TaskManagerInfo | Information
about the TaskManagers for the scale subresource. |
+| conditions | java.util.List<io.fabric8.kubernetes.api.model.Condition> |
Condition of the CR . |
### FlinkSessionJobReconciliationStatus
**Class**:
org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobReconciliationStatus
@@ -476,6 +477,8 @@ This serves as a full reference for FlinkDeployment and
FlinkSessionJob custom r
| DEPLOYING | JobManager process is starting up. |
| MISSING | JobManager deployment not found, probably not started or killed by
user. |
| ERROR | Deployment in terminal error, requires spec change for
reconciliation to continue. |
+| reason | java.lang.String | |
+| message | java.lang.String | |
### JobStatus
**Class**: org.apache.flink.kubernetes.operator.api.status.JobStatus
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/Mode.java
similarity index 91%
rename from
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java
rename to
flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/Mode.java
index adb37e7c..5f5392d5 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/Mode.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/Mode.java
@@ -16,9 +16,8 @@
* limitations under the License.
*/
-package org.apache.flink.kubernetes.operator.config;
+package org.apache.flink.kubernetes.operator.api;
-import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
/** The mode of {@link FlinkDeployment}. */
@@ -45,7 +44,7 @@ public enum Mode {
: getMode(lastReconciledSpec);
}
- private static Mode getMode(FlinkDeploymentSpec spec) {
+ public static Mode getMode(FlinkDeploymentSpec spec) {
return spec.getJob() != null ? APPLICATION : SESSION;
}
}
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java
index 9a341299..afdb1c26 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatus.java
@@ -18,10 +18,13 @@
package org.apache.flink.kubernetes.operator.api.status;
import org.apache.flink.annotation.Experimental;
+import org.apache.flink.kubernetes.operator.api.Mode;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.api.utils.ConditionsUtils;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import io.fabric8.kubernetes.api.model.Condition;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
@@ -29,7 +32,9 @@ import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
/** Last observed status of the Flink deployment. */
@@ -43,6 +48,8 @@ import java.util.Map;
@JsonIgnoreProperties(ignoreUnknown = true)
public class FlinkDeploymentStatus extends CommonStatus<FlinkDeploymentSpec> {
+ public static final String CONDITION_TYPE_RUNNING = "Running";
+
/** Information from running clusters. */
private Map<String, String> clusterInfo = new HashMap<>();
@@ -57,9 +64,47 @@ public class FlinkDeploymentStatus extends
CommonStatus<FlinkDeploymentSpec> {
/** Information about the TaskManagers for the scale subresource. */
private TaskManagerInfo taskManager;
+ /** Condition of the CR . */
+ private List<Condition> conditions = new ArrayList<>();
+
@JsonIgnore
@Override
public boolean isJobCancellable() {
return super.isJobCancellable() &&
jobManagerDeploymentStatus.isRestApiAvailable();
}
+
+ /**
+ * Update the conditions with a RUNNING condition if required.
+ *
+ * @return a list of Conditions that will be either empty or containing a
running condition.
+ */
+ public List<Condition> setRunningConditionIfRequired() {
+ Condition conditionToAdd = null;
+
+ if (reconciliationStatus != null) {
+ FlinkDeploymentSpec deploymentSpec =
+ reconciliationStatus.deserializeLastReconciledSpec();
+
+ if (deploymentSpec != null) {
+ switch (Mode.getMode(deploymentSpec)) {
+ case APPLICATION:
+ conditionToAdd =
+ ConditionsUtils.createApplicationModeCondition(
+ getJobStatus().getState());
+ break;
+ case SESSION:
+ conditionToAdd =
+ ConditionsUtils.createSessionModeCondition(
+ jobManagerDeploymentStatus);
+ }
+ ConditionsUtils.updateLastTransitionTime(conditions,
conditionToAdd);
+ }
+ }
+ List<Condition> newConditions = new ArrayList<>();
+ if (conditionToAdd != null) {
+ newConditions = new ArrayList<>(List.of(conditionToAdd));
+ }
+ setConditions(newConditions);
+ return newConditions;
+ }
}
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentStatus.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentStatus.java
index 2c86c49a..98250774 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentStatus.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobManagerDeploymentStatus.java
@@ -21,20 +21,38 @@ package org.apache.flink.kubernetes.operator.api.status;
public enum JobManagerDeploymentStatus {
/** JobManager is running and ready to receive REST API calls. */
- READY,
+ READY("JobManagerReady", "JobManager is running and ready to receive REST
API calls"),
/** JobManager is running but not ready yet to receive REST API calls. */
- DEPLOYED_NOT_READY,
+ DEPLOYED_NOT_READY(
+ "DeployedNotReady",
+ "JobManager is running but not yet ready to receive REST API
calls"),
/** JobManager process is starting up. */
- DEPLOYING,
+ DEPLOYING("JobManagerIsDeploying", "JobManager process is starting up"),
/** JobManager deployment not found, probably not started or killed by
user. */
// TODO: currently a mix of SUSPENDED and ERROR, needs cleanup
- MISSING,
+ MISSING("JobManagerDeploymentMissing", "JobManager deployment not found"),
/** Deployment in terminal error, requires spec change for reconciliation
to continue. */
- ERROR;
+ ERROR("Error", "JobManager deployment failed");
+
+ private String reason;
+ private String message;
+
+ JobManagerDeploymentStatus(String reason, String message) {
+ this.reason = reason;
+ this.message = message;
+ }
+
+ public String getReason() {
+ return reason;
+ }
+
+ public String getMessage() {
+ return message;
+ }
public boolean isRestApiAvailable() {
return this == READY;
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionsUtils.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionsUtils.java
new file mode 100644
index 00000000..d8a25a59
--- /dev/null
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/utils/ConditionsUtils.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.api.utils;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
+import
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
+
+import io.fabric8.kubernetes.api.model.Condition;
+import io.fabric8.kubernetes.api.model.ConditionBuilder;
+
+import java.time.Instant;
+import java.util.List;
+
+import static org.apache.flink.api.common.JobStatus.RUNNING;
+import static
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus.READY;
+
+/** Utility methods for working with Kubernetes {@link Condition} objects. */
+public class ConditionsUtils {
+
+ /**
+ * Updates the last transition time of the given condition based on the
existing conditions
+ * list. If a condition of the same type already exists and has the same
status, the existing
+ * transition time is preserved; otherwise a new timestamp is set.
+ *
+ * @param conditions the current list of conditions
+ * @param condition the new condition whose last transition time should be
updated
+ */
+ public static void updateLastTransitionTime(List<Condition> conditions,
Condition condition) {
+ if (condition == null) {
+ return;
+ }
+ Condition existingCondition =
+ conditions.stream()
+ .filter(c -> c.getType().equals(condition.getType()))
+ .findFirst()
+ .orElse(null);
+
condition.setLastTransitionTime(getLastTransitionTimeStamp(existingCondition,
condition));
+ }
+
+ /**
+ * Creates a Running condition for application mode based on the given
{@link JobStatus}.
+ *
+ * @param jobStatus the current job status
+ * @return a {@link Condition} reflecting whether the job is running
+ */
+ public static Condition createApplicationModeCondition(JobStatus
jobStatus) {
+ return new ConditionBuilder()
+ .withType(FlinkDeploymentStatus.CONDITION_TYPE_RUNNING)
+ .withStatus(jobStatus == RUNNING ? "True" : "False")
+ .withReason(toCamelCase(jobStatus.name()))
+ .withMessage("Job status " + jobStatus.name())
+ .build();
+ }
+
+ /**
+ * Creates a Running condition for session mode based on the given {@link
+ * JobManagerDeploymentStatus}.
+ *
+ * @param jmStatus the current JobManager deployment status
+ * @return a {@link Condition} reflecting whether the session cluster is
running
+ */
+ public static Condition
createSessionModeCondition(JobManagerDeploymentStatus jmStatus) {
+ return new ConditionBuilder()
+ .withType(FlinkDeploymentStatus.CONDITION_TYPE_RUNNING)
+ .withStatus(jmStatus == READY ? "True" : "False")
+ .withReason(jmStatus.getReason())
+ .withMessage(jmStatus.getMessage())
+ .build();
+ }
+
+ /**
+ * Converts a string to CamelCase by lower-casing it and upper-casing the
first letter. Reason
+ * in the condition object should be a CamelCase string, so need to
convert JobStatus as all the
+ * keywords are one noun, so we only need to upper case the first letter.
+ *
+ * @param reason the string to convert
+ * @return CamelCase reason as String
+ */
+ private static String toCamelCase(String reason) {
+ reason = reason.toLowerCase();
+ return reason.substring(0, 1).toUpperCase() + reason.substring(1);
+ }
+
+ /**
+ * Gets the last transition time for the condition. Returns the current
time if there is no
+ * existing condition or if the condition status has changed, otherwise
returns the existing
+ * condition's LastTransitionTime.
+ *
+ * @param existingCondition the current condition object, may be null
+ * @param condition the new condition object to compare against the
existing one
+ * @return a string representing the last transition time in ISO 8601
format with nanosecond
+ * precision (e.g., "2025-10-30T07:35:35.189752790Z"). Returns a new
timestamp if the
+ * existing condition is null or the status has changed, otherwise
returns the last
+ * transition time of the existing condition.
+ */
+ private static String getLastTransitionTimeStamp(
+ Condition existingCondition, Condition condition) {
+ String lastTransitionTime;
+ if (existingCondition == null
+ ||
!existingCondition.getStatus().equals(condition.getStatus())) {
+ lastTransitionTime = Instant.now().toString();
+ } else {
+ lastTransitionTime = existingCondition.getLastTransitionTime();
+ }
+ return lastTransitionTime;
+ }
+}
diff --git
a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatusConditionsTest.java
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatusConditionsTest.java
new file mode 100644
index 00000000..0800dcbc
--- /dev/null
+++
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/status/FlinkDeploymentStatusConditionsTest.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.api.status;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.kubernetes.operator.api.utils.BaseTestUtils;
+
+import io.fabric8.kubernetes.api.model.Condition;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Test for {@link FlinkDeploymentStatus#setRunningConditionIfRequired()}. */
+class FlinkDeploymentStatusConditionsTest {
+
+ /**
+ * Helper method to get a condition by type from a list of conditions.
+ *
+ * @param conditions the list of conditions
+ * @param type the condition type to find
+ * @return the condition with the specified type, or null if not found
+ */
+ private Condition getConditionByType(List<Condition> conditions, String
type) {
+ return conditions.stream().filter(c ->
type.equals(c.getType())).findFirst().orElse(null);
+ }
+
+ @Test
+ void testCreateConditionFromStatusWithNullReconciliationStatus() {
+ FlinkDeploymentStatus status = new FlinkDeploymentStatus();
+ status.setReconciliationStatus(null);
+
+ List<Condition> conditions = status.setRunningConditionIfRequired();
+
+ assertTrue(
+ conditions.isEmpty(),
+ "Should return empty list when reconciliation status is null");
+ }
+
+ @Test
+ void testCreateConditionFromStatusWithNullDeploymentSpec() {
+ FlinkDeploymentStatus status = new FlinkDeploymentStatus();
+ FlinkDeploymentReconciliationStatus reconciliationStatus =
+ new FlinkDeploymentReconciliationStatus();
+ status.setReconciliationStatus(reconciliationStatus);
+
+ List<Condition> conditions = status.setRunningConditionIfRequired();
+
+ assertTrue(conditions.isEmpty(), "Should return empty list when
deployment spec is null");
+ }
+
+ @Test
+ void testApplicationModeConditionWithRunningJob() {
+ FlinkDeploymentStatus status =
BaseTestUtils.createApplicationModeStatus(JobStatus.RUNNING);
+
+ List<Condition> conditions = status.setRunningConditionIfRequired();
+
+ assertEquals(1, conditions.size(), "Should return one condition");
+ Condition condition =
+ getConditionByType(conditions,
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+ assertNotNull(condition);
+ assertEquals(FlinkDeploymentStatus.CONDITION_TYPE_RUNNING,
condition.getType());
+ assertEquals("True", condition.getStatus());
+ assertEquals("Running", condition.getReason());
+ assertEquals("Job status RUNNING", condition.getMessage());
+ assertNotNull(condition.getLastTransitionTime());
+ }
+
+ @Test
+ void testApplicationModeConditionWithFailedJob() {
+ FlinkDeploymentStatus status =
BaseTestUtils.createApplicationModeStatus(JobStatus.FAILED);
+
+ List<Condition> conditions = status.setRunningConditionIfRequired();
+
+ assertEquals(1, conditions.size());
+ Condition condition =
+ getConditionByType(conditions,
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+ assertNotNull(condition);
+ assertEquals(FlinkDeploymentStatus.CONDITION_TYPE_RUNNING,
condition.getType());
+ assertEquals("False", condition.getStatus());
+ assertEquals("Failed", condition.getReason());
+ assertEquals("Job status FAILED", condition.getMessage());
+ assertNotNull(condition.getLastTransitionTime());
+ }
+
+ @Test
+ void testApplicationModeConditionWithCanceledJob() {
+ FlinkDeploymentStatus status =
+ BaseTestUtils.createApplicationModeStatus(JobStatus.CANCELED);
+
+ List<Condition> conditions = status.setRunningConditionIfRequired();
+
+ assertEquals(1, conditions.size());
+ Condition condition =
+ getConditionByType(conditions,
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+ assertNotNull(condition);
+ assertEquals("False", condition.getStatus());
+ assertEquals("Canceled", condition.getReason());
+ assertEquals("Job status CANCELED", condition.getMessage());
+ assertNotNull(condition.getLastTransitionTime());
+ }
+
+ @Test
+ void testApplicationModeConditionWithFinishedJob() {
+ FlinkDeploymentStatus status =
+ BaseTestUtils.createApplicationModeStatus(JobStatus.FINISHED);
+
+ List<Condition> conditions = status.setRunningConditionIfRequired();
+
+ assertEquals(1, conditions.size());
+ Condition condition =
+ getConditionByType(conditions,
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+ assertNotNull(condition);
+ assertEquals("False", condition.getStatus());
+ assertEquals("Finished", condition.getReason());
+ assertEquals("Job status FINISHED", condition.getMessage());
+ assertNotNull(condition.getLastTransitionTime());
+ }
+
+ @Test
+ void testApplicationModeConditionWithCreatedJob() {
+ FlinkDeploymentStatus status =
BaseTestUtils.createApplicationModeStatus(JobStatus.CREATED);
+
+ List<Condition> conditions = status.setRunningConditionIfRequired();
+
+ assertEquals(1, conditions.size());
+ Condition condition =
+ getConditionByType(conditions,
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+ assertNotNull(condition);
+ assertEquals("False", condition.getStatus());
+ assertEquals("Created", condition.getReason());
+ assertNotNull(condition.getLastTransitionTime());
+ }
+
+ @Test
+ void testSessionModeConditionWithReadyJobManager() {
+ FlinkDeploymentStatus status =
+
BaseTestUtils.createSessionModeStatus(JobManagerDeploymentStatus.READY);
+
+ List<Condition> conditions = status.setRunningConditionIfRequired();
+
+ assertEquals(1, conditions.size());
+ Condition condition =
+ getConditionByType(conditions,
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+ assertNotNull(condition);
+ assertEquals(FlinkDeploymentStatus.CONDITION_TYPE_RUNNING,
condition.getType());
+ assertEquals("True", condition.getStatus());
+ assertEquals(JobManagerDeploymentStatus.READY.getReason(),
condition.getReason());
+ assertEquals(JobManagerDeploymentStatus.READY.getMessage(),
condition.getMessage());
+ assertNotNull(condition.getLastTransitionTime());
+ }
+
+ @Test
+ void testSessionModeConditionWithDeployingJobManager() {
+ FlinkDeploymentStatus status =
+
BaseTestUtils.createSessionModeStatus(JobManagerDeploymentStatus.DEPLOYING);
+
+ List<Condition> conditions = status.setRunningConditionIfRequired();
+
+ assertEquals(1, conditions.size());
+ Condition condition =
+ getConditionByType(conditions,
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+ assertNotNull(condition);
+ assertEquals("False", condition.getStatus());
+ assertEquals(JobManagerDeploymentStatus.DEPLOYING.getReason(),
condition.getReason());
+ assertNotNull(condition.getLastTransitionTime());
+ }
+
+ @Test
+ void testSessionModeConditionWithMissingJobManager() {
+ FlinkDeploymentStatus status =
+
BaseTestUtils.createSessionModeStatus(JobManagerDeploymentStatus.MISSING);
+
+ List<Condition> conditions = status.setRunningConditionIfRequired();
+
+ assertEquals(1, conditions.size());
+ Condition condition =
+ getConditionByType(conditions,
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+ assertNotNull(condition);
+ assertEquals("False", condition.getStatus());
+ assertNotNull(condition.getLastTransitionTime());
+ }
+
+ @Test
+ void testConditionTypeIsAlwaysRunning() {
+ // Test application mode
+ FlinkDeploymentStatus appStatus =
+ BaseTestUtils.createApplicationModeStatus(JobStatus.RUNNING);
+ List<Condition> appConditions =
appStatus.setRunningConditionIfRequired();
+ Condition appCondition =
+ getConditionByType(appConditions,
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+ assertNotNull(appCondition);
+ assertEquals(FlinkDeploymentStatus.CONDITION_TYPE_RUNNING,
appCondition.getType());
+
+ // Test session mode
+ FlinkDeploymentStatus sessionStatus =
+
BaseTestUtils.createSessionModeStatus(JobManagerDeploymentStatus.READY);
+ List<Condition> sessionConditions =
sessionStatus.setRunningConditionIfRequired();
+ Condition sessionCondition =
+ getConditionByType(sessionConditions,
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+ assertNotNull(sessionCondition);
+ assertEquals(FlinkDeploymentStatus.CONDITION_TYPE_RUNNING,
sessionCondition.getType());
+ }
+
+ @Test
+ void testGetLastTransitionTimeStamp_StatusUnchanged() throws
InterruptedException {
+ // Test that timestamp is preserved when status doesn't change
+ FlinkDeploymentStatus status =
BaseTestUtils.createApplicationModeStatus(JobStatus.RUNNING);
+
+ // First call - creates initial condition with timestamp
+ List<Condition> firstConditions =
status.setRunningConditionIfRequired();
+ Condition firstCondition =
+ getConditionByType(firstConditions,
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+ assertNotNull(firstCondition);
+ String firstTimestamp = firstCondition.getLastTransitionTime();
+
+ // Add the condition to status
+ status.setConditions(List.of(firstCondition));
+
+ // Second call - status unchanged (still RUNNING)
+ List<Condition> secondConditions =
status.setRunningConditionIfRequired();
+ Condition secondCondition =
+ getConditionByType(secondConditions,
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+ assertNotNull(secondCondition);
+ String secondTimestamp = secondCondition.getLastTransitionTime();
+
+ // Timestamp should be preserved since status didn't change
+ assertEquals(firstTimestamp, secondTimestamp);
+ }
+
+ @Test
+ void testGetLastTransitionTimeStamp_StatusChanged() throws
InterruptedException {
+ // Test that timestamp is updated when status changes
+ FlinkDeploymentStatus status =
BaseTestUtils.createApplicationModeStatus(JobStatus.RUNNING);
+
+ // First call - creates initial condition with RUNNING status
+ List<Condition> firstConditions =
status.setRunningConditionIfRequired();
+ Condition firstCondition =
+ getConditionByType(firstConditions,
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+ assertNotNull(firstCondition);
+ String firstTimestamp = firstCondition.getLastTransitionTime();
+
+ // Add the condition to status
+ status.setConditions(List.of(firstCondition));
+
+ // Change status to FAILED
+ status.getJobStatus().setState(JobStatus.FAILED);
+
+ // Second call - status changed to FAILED
+ List<Condition> secondConditions =
status.setRunningConditionIfRequired();
+ Condition secondCondition =
+ getConditionByType(secondConditions,
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+ assertNotNull(secondCondition);
+ String secondTimestamp = secondCondition.getLastTransitionTime();
+
+ // Timestamp should be different since status changed
+ assertTrue(
+ !firstTimestamp.equals(secondTimestamp),
+ "Timestamp should be updated when status changes");
+ }
+
+ @Test
+ void testGetLastTransitionTimeStamp_SessionMode() throws
InterruptedException {
+ // Test timestamp behavior in session mode
+ FlinkDeploymentStatus status =
+
BaseTestUtils.createSessionModeStatus(JobManagerDeploymentStatus.READY);
+
+ // First call
+ List<Condition> firstConditions =
status.setRunningConditionIfRequired();
+ Condition firstCondition =
+ getConditionByType(firstConditions,
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+ assertNotNull(firstCondition);
+ String firstTimestamp = firstCondition.getLastTransitionTime();
+
+ // Add condition to status
+ status.setConditions(List.of(firstCondition));
+
+ // Second call with same status
+ List<Condition> secondConditions =
status.setRunningConditionIfRequired();
+ Condition secondCondition =
+ getConditionByType(secondConditions,
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+ assertNotNull(secondCondition);
+ String secondTimestamp = secondCondition.getLastTransitionTime();
+ // Timestamp should be preserved
+ assertEquals(firstTimestamp, secondTimestamp);
+
+ // Change to DEPLOYING
+
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
+
+ // Third call with changed status
+ List<Condition> thirdConditions =
status.setRunningConditionIfRequired();
+ Condition thirdCondition =
+ getConditionByType(thirdConditions,
FlinkDeploymentStatus.CONDITION_TYPE_RUNNING);
+ assertNotNull(thirdCondition);
+ String thirdTimestamp = thirdCondition.getLastTransitionTime();
+ // Timestamp should be updated
+ assertTrue(
+ !secondTimestamp.equals(thirdTimestamp),
+ "Timestamp should be updated when JobManager status changes");
+ }
+}
diff --git
a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
index f3bd54f2..5a47bbd2 100644
---
a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
+++
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
@@ -17,6 +17,7 @@
package org.apache.flink.kubernetes.operator.api.utils;
+import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
@@ -39,8 +40,10 @@ import
org.apache.flink.kubernetes.operator.api.spec.SavepointSpec;
import org.apache.flink.kubernetes.operator.api.spec.TaskManagerSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.CheckpointType;
+import
org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentReconciliationStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
+import
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
@@ -49,6 +52,7 @@ import io.fabric8.kubernetes.api.model.PodSpec;
import io.fabric8.kubernetes.api.model.PodTemplateSpec;
import java.time.Instant;
+import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@@ -268,4 +272,43 @@ public class BaseTestUtils {
return snapshot;
}
+
+ public static FlinkDeploymentStatus createApplicationModeStatus(JobStatus
jobStatus) {
+ FlinkDeploymentStatus status = new FlinkDeploymentStatus();
+ org.apache.flink.kubernetes.operator.api.status.JobStatus
flinkJobStatus =
+ new
org.apache.flink.kubernetes.operator.api.status.JobStatus();
+ flinkJobStatus.setState(jobStatus);
+ status.setJobStatus(flinkJobStatus);
+ status.setConditions(new ArrayList<>());
+
+ FlinkDeploymentReconciliationStatus reconciliationStatus =
+ new FlinkDeploymentReconciliationStatus();
+
+ FlinkDeployment deployment = BaseTestUtils.buildApplicationCluster();
+ String serializedSpec =
SpecUtils.writeSpecWithMeta(deployment.getSpec(), deployment);
+ reconciliationStatus.setLastReconciledSpec(serializedSpec);
+
+ status.setReconciliationStatus(reconciliationStatus);
+ status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+
+ return status;
+ }
+
+ public static FlinkDeploymentStatus createSessionModeStatus(
+ JobManagerDeploymentStatus jmStatus) {
+ FlinkDeploymentStatus status = new FlinkDeploymentStatus();
+ status.setJobManagerDeploymentStatus(jmStatus);
+ status.setConditions(new ArrayList<>());
+
+ FlinkDeploymentReconciliationStatus reconciliationStatus =
+ new FlinkDeploymentReconciliationStatus();
+
+ FlinkDeployment deployment = BaseTestUtils.buildSessionCluster();
+ String serializedSpec =
SpecUtils.writeSpecWithMeta(deployment.getSpec(), deployment);
+ reconciliationStatus.setLastReconciledSpec(serializedSpec);
+
+ status.setReconciliationStatus(reconciliationStatus);
+
+ return status;
+ }
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 32a3109d..69909c5c 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -165,6 +165,7 @@ public class FlinkDeploymentController
throw new ReconciliationException(e);
}
+ flinkApp.getStatus().setRunningConditionIfRequired();
LOG.debug("End of reconciliation");
statusRecorder.patchAndCacheStatus(flinkApp,
ctx.getKubernetesClient());
return ReconciliationUtils.toUpdateControl(
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
index 90555407..8f12183d 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserver.java
@@ -27,7 +27,7 @@ import
org.apache.flink.kubernetes.operator.utils.EventRecorder;
import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED;
-/** The observer of {@link
org.apache.flink.kubernetes.operator.config.Mode#APPLICATION} cluster. */
+/** The observer of {@link
org.apache.flink.kubernetes.operator.api.Mode#APPLICATION} cluster. */
public class ApplicationObserver extends AbstractFlinkDeploymentObserver {
private final SnapshotObserver<FlinkDeployment, FlinkDeploymentStatus>
savepointObserver;
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/FlinkDeploymentObserverFactory.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/FlinkDeploymentObserverFactory.java
index c5e11241..6facfb16 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/FlinkDeploymentObserverFactory.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/FlinkDeploymentObserverFactory.java
@@ -19,8 +19,8 @@ package
org.apache.flink.kubernetes.operator.observer.deployment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.Mode;
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
-import org.apache.flink.kubernetes.operator.config.Mode;
import org.apache.flink.kubernetes.operator.observer.Observer;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
index 84f213e7..4b0ba0d3 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
@@ -24,7 +24,7 @@ import
org.apache.flink.kubernetes.operator.utils.EventRecorder;
import java.util.concurrent.TimeoutException;
-/** The observer of the {@link
org.apache.flink.kubernetes.operator.config.Mode#SESSION} cluster. */
+/** The observer of the {@link
org.apache.flink.kubernetes.operator.api.Mode#SESSION} cluster. */
public class SessionObserver extends AbstractFlinkDeploymentObserver {
public SessionObserver(EventRecorder eventRecorder) {
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
index 429caaa6..3a389a79 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
@@ -20,10 +20,10 @@ package
org.apache.flink.kubernetes.operator.reconciler.deployment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.autoscaler.JobAutoScaler;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.Mode;
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import
org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
-import org.apache.flink.kubernetes.operator.config.Mode;
import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
index 7c483f7f..ac3dab33 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
@@ -27,10 +27,10 @@ import
org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.kubernetes.KubernetesClusterClientFactory;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.Mode;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.artifact.ArtifactManager;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
-import org.apache.flink.kubernetes.operator.config.Mode;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import
org.apache.flink.kubernetes.operator.kubeclient.Fabric8FlinkStandaloneKubeClient;
import
org.apache.flink.kubernetes.operator.kubeclient.FlinkStandaloneKubeClient;
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 4d379139..65cdfaf0 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -130,7 +130,7 @@ public class FlinkDeploymentControllerTest {
assertEquals(
org.apache.flink.api.common.JobStatus.RUNNING,
appCluster.getStatus().getJobStatus().getState());
- assertEquals(7, testController.getInternalStatusUpdateCount());
+ assertEquals(8, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isPatchStatus());
FlinkDeploymentReconciliationStatus reconciliationStatus =
@@ -277,6 +277,8 @@ public class FlinkDeploymentControllerTest {
validatingResponseProvider.assertValidated();
+ validateConditionStatus(appCluster, "Reconciling");
+
// Validate status
assertNotNull(appCluster.getStatus().getError());
@@ -359,7 +361,7 @@ public class FlinkDeploymentControllerTest {
appCluster.getStatus().getJobManagerDeploymentStatus());
assertEquals(
"savepoint_1",
appCluster.getStatus().getJobStatus().getUpgradeSavepointPath());
-
+ validateConditionStatus(appCluster, "Finished");
// Resume from last savepoint
appCluster.getSpec().getJob().setState(JobState.RUNNING);
testController.reconcile(appCluster, context);
@@ -618,6 +620,7 @@ public class FlinkDeploymentControllerTest {
// jobStatus has not been set at this time
assertEquals(org.apache.flink.api.common.JobStatus.RECONCILING,
jobStatus.getState());
+ validateConditionStatus(appCluster, "Reconciling");
// Switches operator mode to SESSION
appCluster.getSpec().setJob(null);
// Validation fails and JobObserver should still be used
@@ -639,6 +642,7 @@ public class FlinkDeploymentControllerTest {
assertEquals(expectedJobStatus.getJobId().toHexString(),
jobStatus.getJobId());
assertEquals(expectedJobStatus.getJobName(), jobStatus.getJobName());
assertEquals(expectedJobStatus.getJobState(), jobStatus.getState());
+ validateConditionStatus(appCluster, "Running");
}
@Test
@@ -652,6 +656,7 @@ public class FlinkDeploymentControllerTest {
assertEquals(
JobManagerDeploymentStatus.DEPLOYING,
appCluster.getStatus().getJobManagerDeploymentStatus());
+ validateConditionStatus(appCluster, "JobManagerIsDeploying");
updateControl = testController.reconcile(appCluster, context);
JobStatus jobStatus = appCluster.getStatus().getJobStatus();
@@ -662,6 +667,8 @@ public class FlinkDeploymentControllerTest {
// jobStatus has not been set at this time
assertNull(jobStatus.getState());
+ validateConditionStatus(appCluster, "DeployedNotReady");
+
// Switches operator mode to APPLICATION
appCluster.getSpec().setJob(TestUtils.buildSessionJob().getSpec().getJob());
// Validation fails and JobObserver should still be used
@@ -676,6 +683,8 @@ public class FlinkDeploymentControllerTest {
.getError()
.contains("Cannot switch from session to job
cluster"));
assertNull(ReconciliationUtils.getDeployedSpec(appCluster).getJob());
+
+ validateConditionStatus(appCluster, "JobManagerReady");
}
private void testUpgradeNotReadyCluster(FlinkDeployment appCluster) throws
Exception {
@@ -1169,7 +1178,7 @@ public class FlinkDeploymentControllerTest {
assertEquals(
org.apache.flink.api.common.JobStatus.RUNNING,
appCluster.getStatus().getJobStatus().getState());
- assertEquals(6, testController.getInternalStatusUpdateCount());
+ assertEquals(7, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isPatchStatus());
assertEquals(
Optional.of(
@@ -1184,7 +1193,7 @@ public class FlinkDeploymentControllerTest {
assertEquals(
org.apache.flink.api.common.JobStatus.RUNNING,
appCluster.getStatus().getJobStatus().getState());
- assertEquals(6, testController.getInternalStatusUpdateCount());
+ assertEquals(7, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isPatchStatus());
assertEquals(
Optional.of(
@@ -1254,4 +1263,12 @@ public class FlinkDeploymentControllerTest {
return ingressRuleV1beta1.getHost();
}
}
+
+ private void validateConditionStatus(FlinkDeployment appCluster, String
reason) {
+ assertThat(appCluster.getStatus().getConditions()).isNotNull();
+ assertThat(appCluster.getStatus().getConditions())
+ .hasSize(1)
+ .extracting("reason")
+ .contains(reason);
+ }
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
index fde2b124..a871466b 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
@@ -24,11 +24,11 @@ import
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.Mode;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
import org.apache.flink.kubernetes.operator.artifact.ArtifactManager;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
-import org.apache.flink.kubernetes.operator.config.Mode;
import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
import org.apache.flink.util.concurrent.Executors;
diff --git
a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index 33ab49e8..c81b2b10 100644
---
a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++
b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -10685,6 +10685,23 @@ spec:
additionalProperties:
type: string
type: object
+ conditions:
+ items:
+ properties:
+ lastTransitionTime:
+ type: string
+ message:
+ type: string
+ observedGeneration:
+ type: integer
+ reason:
+ type: string
+ status:
+ type: string
+ type:
+ type: string
+ type: object
+ type: array
error:
type: string
jobManagerDeploymentStatus: