This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new b3b45ba0 [FLINK-36425][refactor] Replace jobStatus.state string with 
Flink JobStatus enum
b3b45ba0 is described below

commit b3b45ba0bd31322f07d7b3aa0a9e10bba923ae22
Author: Gyula Fora <[email protected]>
AuthorDate: Sun Aug 18 11:59:58 2024 +0200

    [FLINK-36425][refactor] Replace jobStatus.state string with Flink JobStatus 
enum
---
 docs/content/docs/custom-resource/reference.md     |   2 +-
 flink-kubernetes-operator-api/pom.xml              |   4 +-
 .../operator/api/status/CommonStatus.java          |   5 +-
 .../kubernetes/operator/api/status/JobStatus.java  |   2 +-
 .../operator/api/status/SavepointInfo.java         |   2 +
 .../api/validation/CrdCompatibilityChecker.java    |   5 +-
 .../controller/FlinkDeploymentController.java      |   2 +-
 .../operator/controller/FlinkResourceContext.java  |  16 +--
 .../operator/observer/JobStatusObserver.java       |  12 +-
 .../AbstractFlinkDeploymentObserver.java           |   6 +-
 .../operator/reconciler/ReconciliationUtils.java   |  33 ++----
 .../AbstractFlinkResourceReconciler.java           |   3 +-
 .../deployment/AbstractJobReconciler.java          |   3 +-
 .../deployment/ApplicationReconciler.java          |   4 +-
 .../sessionjob/SessionJobReconciler.java           |   2 +-
 .../operator/service/AbstractFlinkService.java     |  11 +-
 .../operator/service/NativeFlinkService.java       |   2 +-
 .../controller/DeploymentRecoveryTest.java         |   6 +-
 .../controller/FailedDeploymentRestartTest.java    |   9 +-
 .../controller/FlinkDeploymentControllerTest.java  |  33 +++---
 .../controller/FlinkSessionJobControllerTest.java  |  40 +++----
 .../FlinkStateSnapshotControllerTest.java          |  12 +-
 .../operator/controller/RollbackTest.java          |   9 +-
 .../controller/UnhealthyDeploymentRestartTest.java |   9 +-
 .../lifecycle/ResourceLifecycleMetricsTest.java    |   8 +-
 .../operator/observer/JobStatusObserverTest.java   |   4 +-
 .../observer/SnapshotObserverLegacyTest.java       |   5 +-
 .../deployment/ApplicationObserverTest.java        |  17 +--
 .../sessionjob/FlinkSessionJobObserverTest.java    |  36 +++---
 .../deployment/ApplicationReconcilerTest.java      |  45 +++++---
 .../ApplicationReconcilerUpgradeModeTest.java      |  36 ++++--
 .../deployment/SessionReconcilerTest.java          |   2 +-
 .../sessionjob/SessionJobReconcilerTest.java       | 121 ++++++++++++---------
 .../operator/service/AbstractFlinkServiceTest.java |  34 +++---
 .../operator/service/NativeFlinkServiceTest.java   |  15 ++-
 .../utils/FlinkStateSnapshotUtilsTest.java         |   7 +-
 .../operator/utils/SnapshotUtilsTest.java          |   2 +-
 .../crds/flinkdeployments.flink.apache.org-v1.yml  |  12 ++
 .../crds/flinksessionjobs.flink.apache.org-v1.yml  |  12 ++
 39 files changed, 323 insertions(+), 265 deletions(-)

diff --git a/docs/content/docs/custom-resource/reference.md 
b/docs/content/docs/custom-resource/reference.md
index c336e1c4..600f6289 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -402,7 +402,7 @@ This serves as a full reference for FlinkDeployment and 
FlinkSessionJob custom r
 | ----------| ---- | ---- |
 | jobName | java.lang.String | Name of the job. |
 | jobId | java.lang.String | Flink JobId of the Job. |
-| state | java.lang.String | Last observed state of the job. |
+| state | org.apache.flink.api.common.JobStatus | Last observed state of the 
job. |
 | startTime | java.lang.String | Start time of the job. |
 | updateTime | java.lang.String | Update time of the job. |
 | upgradeSavepointPath | java.lang.String |  |
diff --git a/flink-kubernetes-operator-api/pom.xml 
b/flink-kubernetes-operator-api/pom.xml
index da1b59a6..40ec2642 100644
--- a/flink-kubernetes-operator-api/pom.xml
+++ b/flink-kubernetes-operator-api/pom.xml
@@ -226,7 +226,7 @@ under the License.
                                       fork="true" failonerror="true">
                                     <classpath 
refid="maven.compile.classpath"/>
                                     <arg 
value="file://${rootDir}/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml"/>
-                                    <arg 
value="https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.6.0/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml"/>
+                                    <arg 
value="https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.9.0/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml"/>
                                 </java>
                             </target>
                         </configuration>
@@ -243,7 +243,7 @@ under the License.
                                       fork="true" failonerror="true">
                                     <classpath 
refid="maven.compile.classpath"/>
                                     <arg 
value="file://${rootDir}/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml"/>
-                                    <arg 
value="https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.6.0/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml"/>
+                                    <arg 
value="https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.9.0/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml"/>
                                 </java>
                             </target>
                         </configuration>
diff --git 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
index 0e9635cb..9d8c6475 100644
--- 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
+++ 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
@@ -81,10 +81,7 @@ public abstract class CommonStatus<SPEC extends 
AbstractFlinkSpec> {
             return ResourceLifecycleState.SUSPENDED;
         }
 
-        var jobState = getJobStatus().getState();
-        if (jobState != null
-                && org.apache.flink.api.common.JobStatus.valueOf(jobState)
-                        .equals(org.apache.flink.api.common.JobStatus.FAILED)) 
{
+        if (getJobStatus().getState() == 
org.apache.flink.api.common.JobStatus.FAILED) {
             return ResourceLifecycleState.FAILED;
         }
 
diff --git 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobStatus.java
 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobStatus.java
index 6adef53c..40d67a34 100644
--- 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobStatus.java
+++ 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobStatus.java
@@ -42,7 +42,7 @@ public class JobStatus {
 
     /** Last observed state of the job. */
     @PrinterColumn(name = "Job Status")
-    private String state;
+    private org.apache.flink.api.common.JobStatus state;
 
     /** Start time of the job. */
     private String startTime;
diff --git 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/SavepointInfo.java
 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/SavepointInfo.java
index bdbe8de0..6ae9c27d 100644
--- 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/SavepointInfo.java
+++ 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/SavepointInfo.java
@@ -88,6 +88,8 @@ public class SavepointInfo implements SnapshotInfo {
      */
     public void updateLastSavepoint(Savepoint savepoint) {
         if (savepoint == null) {
+            // In terminal states we have to handle the case when there is 
actually no savepoint to
+            // not restore from an old one
             lastSavepoint = null;
         } else if (lastSavepoint == null
                 || 
!lastSavepoint.getLocation().equals(savepoint.getLocation())) {
diff --git 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/validation/CrdCompatibilityChecker.java
 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/validation/CrdCompatibilityChecker.java
index faf6024a..8dce0d89 100644
--- 
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/validation/CrdCompatibilityChecker.java
+++ 
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/validation/CrdCompatibilityChecker.java
@@ -160,7 +160,10 @@ public class CrdCompatibilityChecker {
     protected static void checkStringTypeCompatibility(
             String path, JsonNode oldNode, JsonNode newNode) {
         if (!oldNode.has("enum") && newNode.has("enum")) {
-            err("Cannot turn string into enum for " + path);
+            // We make an exception here for jobstatus.state, this is a 
backward compatible change
+            if (!path.equals(".status.jobStatus.state")) {
+                err("Cannot turn string into enum for " + path);
+            }
         }
 
         if (oldNode.has("enum")) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index dbaa6d48..328c2fb7 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -176,7 +176,7 @@ public class FlinkDeploymentController
         var flinkApp = ctx.getResource();
         LOG.error("Flink Deployment failed", dfe);
         
flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
-        
flinkApp.getStatus().getJobStatus().setState(JobStatus.RECONCILING.name());
+        flinkApp.getStatus().getJobStatus().setState(JobStatus.RECONCILING);
         ReconciliationUtils.updateForReconciliationError(ctx, dfe);
         eventRecorder.triggerEvent(
                 flinkApp,
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
index 0d5c5510..3010f478 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
-import 
org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
 import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
@@ -73,7 +72,7 @@ public abstract class FlinkResourceContext<CR extends 
AbstractFlinkResource<?, ?
         CommonStatus<?> status = getResource().getStatus();
         String jobId = status.getJobStatus().getJobId();
 
-        JobStatus jobStatus = generateJobStatusEnum(status);
+        JobStatus jobStatus = status.getJobStatus().getState();
 
         return new KubernetesJobAutoScalerContext(
                 jobId == null ? null : JobID.fromHexString(jobId),
@@ -84,19 +83,6 @@ public abstract class FlinkResourceContext<CR extends 
AbstractFlinkResource<?, ?
                 this);
     }
 
-    @Nullable
-    private JobStatus generateJobStatusEnum(CommonStatus<?> status) {
-        if (status.getLifecycleState() != ResourceLifecycleState.STABLE) {
-            return null;
-        }
-
-        String state = status.getJobStatus().getState();
-        if (state == null) {
-            return null;
-        }
-        return JobStatus.valueOf(state);
-    }
-
     /**
      * Get the config that is currently deployed for the resource spec. The 
returned config may be
      * null in case the resource is not accessible/ready yet.
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index e299531f..c94c1231 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -122,7 +122,7 @@ public class JobStatusObserver<R extends 
AbstractFlinkResource<?, ?>> {
             // upgrading state and retry the upgrade (if possible)
             
resource.getStatus().getReconciliationStatus().setState(ReconciliationState.DEPLOYED);
         }
-        jobStatus.setState(JobStatus.RECONCILING.name());
+        jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING);
         resource.getStatus().setError(JOB_NOT_FOUND_ERR);
     }
 
@@ -135,9 +135,9 @@ public class JobStatusObserver<R extends 
AbstractFlinkResource<?, ?>> {
      */
     private void ifRunningMoveToReconciling(
             org.apache.flink.kubernetes.operator.api.status.JobStatus 
jobStatus,
-            String previousJobStatus) {
-        if (JobStatus.RUNNING.name().equals(previousJobStatus)) {
-            jobStatus.setState(JobStatus.RECONCILING.name());
+            JobStatus previousJobStatus) {
+        if (JobStatus.RUNNING == previousJobStatus) {
+            jobStatus.setState(JobStatus.RECONCILING);
         }
     }
 
@@ -160,7 +160,7 @@ public class JobStatusObserver<R extends 
AbstractFlinkResource<?, ?>> {
         var previousJobStatus = jobStatus.getState();
         var currentJobStatus = clusterJobStatus.getJobState();
 
-        jobStatus.setState(clusterJobStatus.getJobState().name());
+        jobStatus.setState(currentJobStatus);
         jobStatus.setJobName(clusterJobStatus.getJobName());
         
jobStatus.setStartTime(String.valueOf(clusterJobStatus.getStartTime()));
 
@@ -177,7 +177,7 @@ public class JobStatusObserver<R extends 
AbstractFlinkResource<?, ?>> {
 
             if (JobStatus.CANCELED == currentJobStatus
                     || (currentJobStatus.isGloballyTerminalState()
-                            && 
JobStatus.CANCELLING.name().equals(previousJobStatus))) {
+                            && 
JobStatus.CANCELLING.equals(previousJobStatus))) {
                 // The job was cancelled
                 markSuspended(resource);
             }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
index fd19713a..9020cbfa 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
@@ -135,7 +135,7 @@ public abstract class AbstractFlinkDeploymentObserver
                 checkContainerBackoff(ctx);
             } catch (DeploymentFailedException dfe) {
                 // throw only when not already in error status to allow for 
spec update
-                
deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING.name());
+                
deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING);
                 if (!JobManagerDeploymentStatus.ERROR.equals(
                         deploymentStatus.getJobManagerDeploymentStatus())) {
                     throw dfe;
@@ -149,7 +149,7 @@ public abstract class AbstractFlinkDeploymentObserver
         }
 
         
deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
-        deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING.name());
+        deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING);
 
         if (previousJmStatus != JobManagerDeploymentStatus.MISSING
                 && previousJmStatus != JobManagerDeploymentStatus.ERROR) {
@@ -192,7 +192,7 @@ public abstract class AbstractFlinkDeploymentObserver
         FlinkDeploymentStatus status = dep.getStatus();
         var reconciliationStatus = status.getReconciliationStatus();
         if (status.getJobManagerDeploymentStatus() != 
JobManagerDeploymentStatus.ERROR
-                && 
!JobStatus.FAILED.name().equals(dep.getStatus().getJobStatus().getState())
+                && 
!JobStatus.FAILED.equals(dep.getStatus().getJobStatus().getState())
                 && reconciliationStatus.isLastReconciledSpecStable()) {
             status.setError(null);
         }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 15e15141..8dba93bc 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -55,7 +55,10 @@ import java.time.Duration;
 import java.time.Instant;
 import java.util.function.BiConsumer;
 
+import static org.apache.flink.api.common.JobStatus.CANCELED;
+import static org.apache.flink.api.common.JobStatus.CANCELLING;
 import static org.apache.flink.api.common.JobStatus.FINISHED;
+import static org.apache.flink.api.common.JobStatus.RECONCILING;
 import static org.apache.flink.api.common.JobStatus.RUNNING;
 import static 
org.apache.flink.kubernetes.operator.utils.FlinkResourceExceptionUtils.updateFlinkResourceException;
 import static 
org.apache.flink.kubernetes.operator.utils.FlinkResourceExceptionUtils.updateFlinkStateSnapshotException;
@@ -233,7 +236,7 @@ public class ReconciliationUtils {
         }
     }
 
-    public static void updateForReconciliationError(FlinkResourceContext ctx, 
Throwable error) {
+    public static void updateForReconciliationError(FlinkResourceContext<?> 
ctx, Throwable error) {
         updateFlinkResourceException(error, ctx.getResource(), 
ctx.getOperatorConfig());
     }
 
@@ -352,35 +355,23 @@ public class ReconciliationUtils {
 
     public static boolean isJobInTerminalState(CommonStatus<?> status) {
         var jobState = status.getJobStatus().getState();
-        if (jobState == null) {
-            jobState = 
org.apache.flink.api.common.JobStatus.RECONCILING.name();
-        }
-        return 
org.apache.flink.api.common.JobStatus.valueOf(jobState).isGloballyTerminalState();
+        return jobState != null && jobState.isGloballyTerminalState();
     }
 
     public static boolean isJobRunning(CommonStatus<?> status) {
-        return org.apache.flink.api.common.JobStatus.RUNNING
-                .name()
-                .equals(status.getJobStatus().getState());
+        return RUNNING == status.getJobStatus().getState();
     }
 
     public static boolean isJobCancelled(CommonStatus<?> status) {
-        return org.apache.flink.api.common.JobStatus.CANCELED
-                .name()
-                .equals(status.getJobStatus().getState());
+        return CANCELED == status.getJobStatus().getState();
     }
 
     public static boolean isJobCancellable(CommonStatus<?> status) {
-        return !org.apache.flink.api.common.JobStatus.RECONCILING
-                .name()
-                .equals(status.getJobStatus().getState());
+        return RECONCILING != status.getJobStatus().getState();
     }
 
     public static boolean isJobCancelling(CommonStatus<?> status) {
-        return status.getJobStatus() != null
-                && org.apache.flink.api.common.JobStatus.CANCELLING
-                        .name()
-                        .equals(status.getJobStatus().getState());
+        return status.getJobStatus() != null && CANCELLING == 
status.getJobStatus().getState();
     }
 
     /**
@@ -503,8 +494,7 @@ public class ReconciliationUtils {
      * @param status Status to be updated.
      */
     public static void checkAndUpdateStableSpec(CommonStatus<?> status) {
-        var flinkJobStatus =
-                
org.apache.flink.api.common.JobStatus.valueOf(status.getJobStatus().getState());
+        var flinkJobStatus = status.getJobStatus().getState();
 
         if (status.getReconciliationStatus().getState() != 
ReconciliationState.DEPLOYED) {
             return;
@@ -542,8 +532,7 @@ public class ReconciliationUtils {
         var lastJobSpec = lastSpecWithMeta.getSpec().getJob();
         if (lastJobSpec != null) {
             lastJobSpec.setState(JobState.RUNNING);
-            status.getJobStatus()
-                    
.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+            status.getJobStatus().setState(RECONCILING);
         }
         reconciliationStatus.setState(ReconciliationState.DEPLOYED);
         reconciliationStatus.setLastReconciledSpec(
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index cc57889b..757e1a7c 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -18,7 +18,6 @@
 package org.apache.flink.kubernetes.operator.reconciler.deployment;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.autoscaler.JobAutoScaler;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
@@ -511,7 +510,7 @@ public abstract class AbstractFlinkResourceReconciler<
         boolean nonTerminalApplication =
                 !sessionCluster
                         && deployedJob.getState() == JobState.RUNNING
-                        && 
!JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState();
+                        && !jobStatus.getState().isGloballyTerminalState();
         boolean jmShouldBeRunning = sessionCluster || nonTerminalApplication;
         return jmShouldBeRunning
                 && (status.getJobManagerDeploymentStatus() == 
JobManagerDeploymentStatus.MISSING);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 594632b2..a6db17b9 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -420,8 +420,7 @@ public abstract class AbstractJobReconciler<
     @Override
     public boolean reconcileOtherChanges(FlinkResourceContext<CR> ctx) throws 
Exception {
         var status = ctx.getResource().getStatus();
-        var jobStatus =
-                
org.apache.flink.api.common.JobStatus.valueOf(status.getJobStatus().getState());
+        var jobStatus = status.getJobStatus().getState();
         if (jobStatus == org.apache.flink.api.common.JobStatus.FAILED
                 && 
ctx.getObserveConfig().getBoolean(OPERATOR_JOB_RESTART_FAILED)) {
             LOG.info("Stopping failed Flink job...");
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index a078a593..71cf4179 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -124,7 +124,7 @@ public class ApplicationReconciler
 
     private void deleteJmThatNeverStarted(
             FlinkService flinkService, FlinkDeployment deployment, 
Configuration deployConfig) {
-        
deployment.getStatus().getJobStatus().setState(JobStatus.FAILED.name());
+        deployment.getStatus().getJobStatus().setState(JobStatus.FAILED);
         flinkService.deleteClusterDeployment(
                 deployment.getMetadata(), deployment.getStatus(), 
deployConfig, false);
         LOG.info("Deleted application cluster that never started.");
@@ -181,7 +181,7 @@ public class ApplicationReconciler
                 MSG_SUBMIT,
                 ctx.getKubernetesClient());
         flinkService.submitApplicationCluster(spec.getJob(), deployConfig, 
requireHaMetadata);
-        
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+        
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING);
         
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
 
         IngressUtils.updateIngressRules(
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
index af227a93..f1fa8885 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
@@ -94,7 +94,7 @@ public class SessionJobReconciler
                         savepoint.orElse(null));
 
         var status = ctx.getResource().getStatus();
-        
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+        
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING);
     }
 
     @Override
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index c89baf89..68d7fe89 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -343,7 +343,7 @@ public abstract class AbstractFlinkService implements 
FlinkService {
                     deployment.getMetadata(), status, conf, 
suspendMode.deleteHaMeta());
         }
 
-        status.getJobStatus().setState(JobStatus.FINISHED.name());
+        status.getJobStatus().setState(JobStatus.FINISHED);
         return CancelResult.completed(savepointPath);
     }
 
@@ -366,7 +366,7 @@ public abstract class AbstractFlinkService implements 
FlinkService {
                     break;
             }
         }
-        status.getJobStatus().setState(JobStatus.FINISHED.name());
+        status.getJobStatus().setState(JobStatus.FINISHED);
         status.getJobStatus().setJobId(null);
         return CancelResult.completed(savepointPath);
     }
@@ -404,7 +404,7 @@ public abstract class AbstractFlinkService implements 
FlinkService {
                         "Cancellation Error", 
EventRecorder.Reason.CleanupFailed.name(), e);
             }
         }
-        status.getJobStatus().setState(JobStatus.CANCELLING.name());
+        status.getJobStatus().setState(JobStatus.CANCELLING);
     }
 
     public String savepointJobOrError(
@@ -1037,9 +1037,8 @@ public abstract class AbstractFlinkService implements 
FlinkService {
     protected void updateStatusAfterClusterDeletion(FlinkDeploymentStatus 
status) {
         
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
         var currentJobState = status.getJobStatus().getState();
-        if (currentJobState == null
-                || 
!JobStatus.valueOf(currentJobState).isGloballyTerminalState()) {
-            status.getJobStatus().setState(JobStatus.FINISHED.name());
+        if (currentJobState == null || 
!currentJobState.isGloballyTerminalState()) {
+            status.getJobStatus().setState(JobStatus.FINISHED);
         }
     }
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
index ffd071e6..a5a63f8b 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
@@ -262,7 +262,7 @@ public class NativeFlinkService extends 
AbstractFlinkService {
 
         var status = resource.getStatus();
         if (ReconciliationUtils.isJobInTerminalState(status)
-                || 
JobStatus.RECONCILING.name().equals(status.getJobStatus().getState())) {
+                || 
JobStatus.RECONCILING.equals(status.getJobStatus().getState())) {
             LOG.info("Job in terminal or reconciling state cannot be scaled 
in-place");
             return false;
         }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
index 67857b52..76d03c9b 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.kubernetes.operator.controller;
 
-import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
@@ -35,6 +34,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
 
+import static org.apache.flink.api.common.JobStatus.RUNNING;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -101,7 +101,7 @@ public class DeploymentRecoveryTest {
         assertEquals(
                 JobManagerDeploymentStatus.READY,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
-        assertEquals(JobStatus.RUNNING.name(), 
appCluster.getStatus().getJobStatus().getState());
+        assertEquals(RUNNING, 
appCluster.getStatus().getJobStatus().getState());
 
         // Remove deployment
         flinkService.setPortReady(false);
@@ -130,7 +130,7 @@ public class DeploymentRecoveryTest {
             assertEquals(
                     JobManagerDeploymentStatus.READY,
                     appCluster.getStatus().getJobManagerDeploymentStatus());
-            assertEquals("RUNNING", 
appCluster.getStatus().getJobStatus().getState());
+            assertEquals(RUNNING, 
appCluster.getStatus().getJobStatus().getState());
             assertEquals(
                     appCluster.getSpec(),
                     appCluster
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java
index 8fcad0a5..c84fc74c 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java
@@ -42,6 +42,7 @@ import org.junit.jupiter.params.provider.MethodSource;
 
 import java.util.Optional;
 
+import static org.apache.flink.api.common.JobStatus.RUNNING;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_JOB_RESTART_FAILED;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -85,7 +86,7 @@ public class FailedDeploymentRestartTest extends 
OperatorTestBase {
         assertEquals(
                 JobManagerDeploymentStatus.READY,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
-        assertEquals("RUNNING", 
appCluster.getStatus().getJobStatus().getState());
+        assertEquals(RUNNING, 
appCluster.getStatus().getJobStatus().getState());
 
         // Make deployment unhealthy
         flinkService.markApplicationJobFailedWithError(
@@ -101,7 +102,7 @@ public class FailedDeploymentRestartTest extends 
OperatorTestBase {
         assertEquals(
                 JobManagerDeploymentStatus.READY,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
-        assertEquals("RUNNING", 
appCluster.getStatus().getJobStatus().getState());
+        assertEquals(RUNNING, 
appCluster.getStatus().getJobStatus().getState());
 
         // We started without savepoint
         appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
@@ -129,7 +130,7 @@ public class FailedDeploymentRestartTest extends 
OperatorTestBase {
         assertEquals(
                 JobManagerDeploymentStatus.READY,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
-        assertEquals("RUNNING", 
appCluster.getStatus().getJobStatus().getState());
+        assertEquals(RUNNING, 
appCluster.getStatus().getJobStatus().getState());
         
assertNull(flinkService.getSubmittedConf().get(SavepointConfigOptions.SAVEPOINT_PATH));
 
         // trigger checkpoint
@@ -157,7 +158,7 @@ public class FailedDeploymentRestartTest extends 
OperatorTestBase {
         assertEquals(
                 JobManagerDeploymentStatus.READY,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
-        assertEquals("RUNNING", 
appCluster.getStatus().getJobStatus().getState());
+        assertEquals(RUNNING, 
appCluster.getStatus().getJobStatus().getState());
 
         // check savepoint_path
         if (upgradeMode != UpgradeMode.STATELESS) {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 32e49502..08937e70 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -128,7 +128,7 @@ public class FlinkDeploymentControllerTest {
                 JobManagerDeploymentStatus.READY,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
         assertEquals(
-                org.apache.flink.api.common.JobStatus.RUNNING.name(),
+                org.apache.flink.api.common.JobStatus.RUNNING,
                 appCluster.getStatus().getJobStatus().getState());
         assertEquals(7, testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
@@ -147,7 +147,7 @@ public class FlinkDeploymentControllerTest {
         JobStatusMessage expectedJobStatus = flinkService.listJobs().get(0).f1;
         assertEquals(expectedJobStatus.getJobId().toHexString(), 
jobStatus.getJobId());
         assertEquals(expectedJobStatus.getJobName(), jobStatus.getJobName());
-        assertEquals(expectedJobStatus.getJobState().toString(), 
jobStatus.getState());
+        assertEquals(expectedJobStatus.getJobState(), jobStatus.getState());
 
         // Validate last stable spec is still the old one
         assertEquals(
@@ -350,7 +350,7 @@ public class FlinkDeploymentControllerTest {
                 JobManagerDeploymentStatus.ERROR,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
         assertEquals(
-                org.apache.flink.api.common.JobStatus.RECONCILING.name(),
+                org.apache.flink.api.common.JobStatus.RECONCILING,
                 appCluster.getStatus().getJobStatus().getState());
 
         // Validate status status
@@ -604,7 +604,7 @@ public class FlinkDeploymentControllerTest {
                 JobManagerDeploymentStatus.READY,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
         assertEquals(
-                org.apache.flink.api.common.JobStatus.RUNNING.name(),
+                org.apache.flink.api.common.JobStatus.RUNNING,
                 appCluster.getStatus().getJobStatus().getState());
         assertEquals(
                 JobState.RUNNING,
@@ -693,8 +693,7 @@ public class FlinkDeploymentControllerTest {
                 JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
         // jobStatus has not been set at this time
-        assertEquals(
-                org.apache.flink.api.common.JobStatus.RECONCILING.name(), 
jobStatus.getState());
+        assertEquals(org.apache.flink.api.common.JobStatus.RECONCILING, 
jobStatus.getState());
 
         // Switches operator mode to SESSION
         appCluster.getSpec().setJob(null);
@@ -716,7 +715,7 @@ public class FlinkDeploymentControllerTest {
         JobStatusMessage expectedJobStatus = flinkService.listJobs().get(0).f1;
         assertEquals(expectedJobStatus.getJobId().toHexString(), 
jobStatus.getJobId());
         assertEquals(expectedJobStatus.getJobName(), jobStatus.getJobName());
-        assertEquals(expectedJobStatus.getJobState().toString(), 
jobStatus.getState());
+        assertEquals(expectedJobStatus.getJobState(), jobStatus.getState());
     }
 
     @Test
@@ -798,11 +797,11 @@ public class FlinkDeploymentControllerTest {
         testController.reconcile(appCluster, context);
         if (appCluster.getSpec().getJob() != null) {
             assertEquals(
-                    org.apache.flink.api.common.JobStatus.RUNNING.name(),
+                    org.apache.flink.api.common.JobStatus.RUNNING,
                     appCluster.getStatus().getJobStatus().getState());
         } else {
             assertEquals(
-                    org.apache.flink.api.common.JobStatus.FINISHED.name(),
+                    org.apache.flink.api.common.JobStatus.FINISHED,
                     appCluster.getStatus().getJobStatus().getState());
         }
         assertEquals(
@@ -866,7 +865,7 @@ public class FlinkDeploymentControllerTest {
         testController.reconcile(appCluster, context);
 
         assertEquals(
-                org.apache.flink.api.common.JobStatus.RUNNING.name(),
+                org.apache.flink.api.common.JobStatus.RUNNING,
                 appCluster.getStatus().getJobStatus().getState());
         assertEquals(
                 JobManagerDeploymentStatus.READY,
@@ -894,7 +893,7 @@ public class FlinkDeploymentControllerTest {
         testController.reconcile(appCluster, context);
 
         assertEquals(
-                org.apache.flink.api.common.JobStatus.RUNNING.name(),
+                org.apache.flink.api.common.JobStatus.RUNNING,
                 appCluster.getStatus().getJobStatus().getState());
         assertEquals(
                 JobManagerDeploymentStatus.READY,
@@ -1108,7 +1107,7 @@ public class FlinkDeploymentControllerTest {
                 JobManagerDeploymentStatus.READY,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
         assertEquals(
-                org.apache.flink.api.common.JobStatus.RUNNING.name(),
+                org.apache.flink.api.common.JobStatus.RUNNING,
                 appCluster.getStatus().getJobStatus().getState());
     }
 
@@ -1154,7 +1153,7 @@ public class FlinkDeploymentControllerTest {
                 JobManagerDeploymentStatus.DEPLOYING,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
         assertEquals(
-                org.apache.flink.api.common.JobStatus.RECONCILING.name(),
+                org.apache.flink.api.common.JobStatus.RECONCILING,
                 appCluster.getStatus().getJobStatus().getState());
         assertEquals(4, testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
@@ -1177,7 +1176,7 @@ public class FlinkDeploymentControllerTest {
                 JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
         assertEquals(
-                org.apache.flink.api.common.JobStatus.RECONCILING.name(),
+                org.apache.flink.api.common.JobStatus.RECONCILING,
                 appCluster.getStatus().getJobStatus().getState());
         assertEquals(5, testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
@@ -1191,7 +1190,7 @@ public class FlinkDeploymentControllerTest {
                 JobManagerDeploymentStatus.READY,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
         assertEquals(
-                org.apache.flink.api.common.JobStatus.RUNNING.name(),
+                org.apache.flink.api.common.JobStatus.RUNNING,
                 appCluster.getStatus().getJobStatus().getState());
         assertEquals(6, testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
@@ -1206,7 +1205,7 @@ public class FlinkDeploymentControllerTest {
                 JobManagerDeploymentStatus.READY,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
         assertEquals(
-                org.apache.flink.api.common.JobStatus.RUNNING.name(),
+                org.apache.flink.api.common.JobStatus.RUNNING,
                 appCluster.getStatus().getJobStatus().getState());
         assertEquals(6, testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
@@ -1220,7 +1219,7 @@ public class FlinkDeploymentControllerTest {
         JobStatusMessage expectedJobStatus = flinkService.listJobs().get(0).f1;
         assertEquals(expectedJobStatus.getJobId().toHexString(), 
jobStatus.getJobId());
         assertEquals(expectedJobStatus.getJobName(), jobStatus.getJobName());
-        assertEquals(expectedJobStatus.getJobState().toString(), 
jobStatus.getState());
+        assertEquals(expectedJobStatus.getJobState(), jobStatus.getState());
         assertEquals(
                 
appCluster.getStatus().getReconciliationStatus().getLastReconciledSpec(),
                 
appCluster.getStatus().getReconciliationStatus().getLastStableSpec());
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
index 5b3578f3..82ccaea5 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.kubernetes.operator.controller;
 
-import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
@@ -51,6 +50,9 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.api.common.JobStatus.CANCELLING;
+import static org.apache.flink.api.common.JobStatus.RECONCILING;
+import static org.apache.flink.api.common.JobStatus.RUNNING;
 import static 
org.apache.flink.kubernetes.operator.TestUtils.MAX_RECONCILE_TIMES;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED;
 import static 
org.apache.flink.kubernetes.operator.utils.EventRecorder.Reason.ValidationError;
@@ -119,7 +121,7 @@ class FlinkSessionJobControllerTest {
         sessionJob.getSpec().getJob().setParallelism(-1);
         updateControl = testController.reconcile(sessionJob, context);
 
-        assertEquals(JobStatus.RUNNING.name(), 
sessionJob.getStatus().getJobStatus().getState());
+        assertEquals(RUNNING, 
sessionJob.getStatus().getJobStatus().getState());
         assertEquals(6, testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
 
@@ -137,7 +139,7 @@ class FlinkSessionJobControllerTest {
         JobStatusMessage expectedJobStatus = flinkService.listJobs().get(0).f1;
         assertEquals(expectedJobStatus.getJobId().toHexString(), 
jobStatus.getJobId());
         assertEquals(expectedJobStatus.getJobName(), jobStatus.getJobName());
-        assertEquals(expectedJobStatus.getJobState().toString(), 
jobStatus.getState());
+        assertEquals(expectedJobStatus.getJobState(), jobStatus.getState());
 
         // Validate last stable spec is still the old one
         assertEquals(
@@ -271,7 +273,7 @@ class FlinkSessionJobControllerTest {
         testController.reconcile(sessionJob, context);
 
         // Make sure we are cancelling
-        assertEquals("CANCELLING", 
sessionJob.getStatus().getJobStatus().getState());
+        assertEquals(CANCELLING, 
sessionJob.getStatus().getJobStatus().getState());
 
         // Once cancelling completed make sure that last reconciled spec is 
correctly upgraded and
         // job was started from cp
@@ -285,7 +287,7 @@ class FlinkSessionJobControllerTest {
                         .deserializeLastReconciledSpec()
                         .getJob()
                         .getUpgradeMode());
-        assertEquals("RECONCILING", 
sessionJob.getStatus().getJobStatus().getState());
+        assertEquals(RECONCILING, 
sessionJob.getStatus().getJobStatus().getState());
         assertEquals(
                 ReconciliationState.DEPLOYED,
                 sessionJob.getStatus().getReconciliationStatus().getState());
@@ -296,7 +298,7 @@ class FlinkSessionJobControllerTest {
         assertEquals("cp1", jobs.get(0).f0);
 
         testController.reconcile(sessionJob, context);
-        assertEquals("RUNNING", 
sessionJob.getStatus().getJobStatus().getState());
+        assertEquals(RUNNING, 
sessionJob.getStatus().getJobStatus().getState());
 
         // Suspend job
         flinkService.setCheckpointInfo(
@@ -330,7 +332,7 @@ class FlinkSessionJobControllerTest {
         testController.reconcile(sessionJob, context);
 
         // Make sure we are cancelling
-        assertEquals("CANCELLING", 
sessionJob.getStatus().getJobStatus().getState());
+        assertEquals(CANCELLING, 
sessionJob.getStatus().getJobStatus().getState());
         testController.events().poll();
         assertEquals(
                 testController.events().poll().getReason(),
@@ -342,7 +344,7 @@ class FlinkSessionJobControllerTest {
 
         testController.reconcile(sessionJob, context);
         assertEquals(JobStatusObserver.JOB_NOT_FOUND_ERR, 
sessionJob.getStatus().getError());
-        assertEquals("RECONCILING", 
sessionJob.getStatus().getJobStatus().getState());
+        assertEquals(RECONCILING, 
sessionJob.getStatus().getJobStatus().getState());
         assertEquals(
                 ReconciliationState.DEPLOYED,
                 sessionJob.getStatus().getReconciliationStatus().getState());
@@ -496,7 +498,7 @@ class FlinkSessionJobControllerTest {
                 EventRecorder.Reason.JobStatusChanged,
                 EventRecorder.Reason.valueOf(statusEvents.get(2).getReason()));
 
-        assertEquals(JobStatus.RUNNING.name(), 
sessionJob.getStatus().getJobStatus().getState());
+        assertEquals(RUNNING, 
sessionJob.getStatus().getJobStatus().getState());
         assertEquals(
                 JobState.RUNNING,
                 sessionJob
@@ -518,8 +520,7 @@ class FlinkSessionJobControllerTest {
                 
.put(KubernetesOperatorConfigOptions.JAR_ARTIFACT_HTTP_HEADER.key(), "changed");
         updateControl = testController.reconcile(sessionJob, context);
         assertFalse(updateControl.isUpdateStatus());
-        assertEquals(
-                JobStatus.RECONCILING.name(), 
sessionJob.getStatus().getJobStatus().getState());
+        assertEquals(RECONCILING, 
sessionJob.getStatus().getJobStatus().getState());
 
         // Check when the bad config is applied, observe() will change the 
cluster state correctly
         sessionJob.getSpec().getJob().setParallelism(-1);
@@ -531,7 +532,7 @@ class FlinkSessionJobControllerTest {
                         .getError()
                         .contains("Job parallelism must be larger than 0"));
         assertFalse(updateControl.isUpdateStatus());
-        assertEquals(JobStatus.RUNNING.name(), 
sessionJob.getStatus().getJobStatus().getState());
+        assertEquals(RUNNING, 
sessionJob.getStatus().getJobStatus().getState());
 
         // Make sure we do validation before getting effective config in 
reconcile().
         // Verify the saved headers in lastReconciledSpec is actually used in 
observe() by
@@ -547,7 +548,7 @@ class FlinkSessionJobControllerTest {
                                 configuration.get(
                                         
KubernetesOperatorConfigOptions.JAR_ARTIFACT_HTTP_HEADER)));
         testController.reconcile(sessionJob, context);
-        assertEquals(JobStatus.RUNNING.name(), 
sessionJob.getStatus().getJobStatus().getState());
+        assertEquals(RUNNING, 
sessionJob.getStatus().getJobStatus().getState());
     }
 
     @Test
@@ -572,7 +573,7 @@ class FlinkSessionJobControllerTest {
         
assertNull(sessionJob.getStatus().getReconciliationStatus().getLastStableSpec());
 
         testController.reconcile(sessionJob, context);
-        assertEquals(JobStatus.RUNNING.name(), 
sessionJob.getStatus().getJobStatus().getState());
+        assertEquals(RUNNING, 
sessionJob.getStatus().getJobStatus().getState());
         assertNull(sessionJob.getStatus().getError());
 
         assertEquals(
@@ -649,7 +650,7 @@ class FlinkSessionJobControllerTest {
 
         var deleteControl = testController.cleanup(sessionJob, context);
 
-        assertEquals("CANCELLING", 
sessionJob.getStatus().getJobStatus().getState());
+        assertEquals(CANCELLING, 
sessionJob.getStatus().getJobStatus().getState());
         assertFalse(deleteControl.isRemoveFinalizer());
         assertEquals(
                 
configManager.getOperatorConfiguration().getProgressCheckInterval().toMillis(),
@@ -688,8 +689,7 @@ class FlinkSessionJobControllerTest {
                 testController.reconcile(sessionJob, context);
 
         // Reconciling
-        assertEquals(
-                JobStatus.RECONCILING.name(), 
sessionJob.getStatus().getJobStatus().getState());
+        assertEquals(RECONCILING, 
sessionJob.getStatus().getJobStatus().getState());
         assertEquals(4, testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
         assertEquals(
@@ -706,7 +706,7 @@ class FlinkSessionJobControllerTest {
 
         // Running
         updateControl = testController.reconcile(sessionJob, context);
-        assertEquals(JobStatus.RUNNING.name(), 
sessionJob.getStatus().getJobStatus().getState());
+        assertEquals(RUNNING, 
sessionJob.getStatus().getJobStatus().getState());
         assertEquals(5, testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
         assertEquals(
@@ -716,7 +716,7 @@ class FlinkSessionJobControllerTest {
 
         // Stable loop
         updateControl = testController.reconcile(sessionJob, context);
-        assertEquals(JobStatus.RUNNING.name(), 
sessionJob.getStatus().getJobStatus().getState());
+        assertEquals(RUNNING, 
sessionJob.getStatus().getJobStatus().getState());
         assertEquals(5, testController.getInternalStatusUpdateCount());
         assertFalse(updateControl.isUpdateStatus());
         assertEquals(
@@ -729,7 +729,7 @@ class FlinkSessionJobControllerTest {
         JobStatusMessage expectedJobStatus = flinkService.listJobs().get(0).f1;
         assertEquals(expectedJobStatus.getJobId().toHexString(), 
jobStatus.getJobId());
         assertEquals(expectedJobStatus.getJobName(), jobStatus.getJobName());
-        assertEquals(expectedJobStatus.getJobState().toString(), 
jobStatus.getState());
+        assertEquals(expectedJobStatus.getJobState(), jobStatus.getState());
         assertEquals(
                 
sessionJob.getStatus().getReconciliationStatus().getLastReconciledSpec(),
                 
sessionJob.getStatus().getReconciliationStatus().getLastStableSpec());
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java
index 9b68bea6..593841b5 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java
@@ -61,6 +61,8 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.function.BiConsumer;
 
+import static org.apache.flink.api.common.JobStatus.CANCELED;
+import static org.apache.flink.api.common.JobStatus.RUNNING;
 import static 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.ABANDONED;
 import static 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.COMPLETED;
 import static 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.FAILED;
@@ -188,7 +190,7 @@ public class FlinkStateSnapshotControllerTest {
         controller.reconcile(snapshot, context);
         assertThat(snapshot.getStatus().getState()).isEqualTo(IN_PROGRESS);
 
-        deployment.getStatus().getJobStatus().setState("CANCELED");
+        deployment.getStatus().getJobStatus().setState(CANCELED);
         controller.reconcile(snapshot, context);
         var status = snapshot.getStatus();
         var createdAt = 
Instant.parse(snapshot.getMetadata().getCreationTimestamp());
@@ -281,14 +283,14 @@ public class FlinkStateSnapshotControllerTest {
         assertThat(flinkService.getDisposedSavepoints()).isEmpty();
 
         // Failed dispose, job not running
-        deployment.getStatus().getJobStatus().setState("CANCELED");
+        deployment.getStatus().getJobStatus().setState(CANCELED);
         snapshot.getSpec().getSavepoint().setDisposeOnDelete(true);
         assertDeleteControl(
                 controller.cleanup(snapshot, context),
                 false,
                 
configManager.getOperatorConfiguration().getReconcileInterval().toMillis());
         assertThat(flinkService.getDisposedSavepoints()).isEmpty();
-        deployment.getStatus().getJobStatus().setState("RUNNING");
+        deployment.getStatus().getJobStatus().setState(RUNNING);
 
         // Failed dispose, REST error
         snapshot.getSpec().getSavepoint().setDisposeOnDelete(true);
@@ -565,7 +567,7 @@ public class FlinkStateSnapshotControllerTest {
     @Test
     public void testReconcileJobNotRunning() {
         var deployment = createDeployment();
-        deployment.getStatus().getJobStatus().setState("CANCELED");
+        deployment.getStatus().getJobStatus().setState(CANCELED);
         context = TestUtils.createSnapshotContext(client, deployment);
         var snapshot = createSavepoint(deployment);
         var errorMessage =
@@ -670,7 +672,7 @@ public class FlinkStateSnapshotControllerTest {
         var deployment = TestUtils.buildApplicationCluster();
         deployment
                 .getStatus()
-                
.setJobStatus(JobStatus.builder().state("RUNNING").jobId(JOB_ID).build());
+                
.setJobStatus(JobStatus.builder().state(RUNNING).jobId(JOB_ID).build());
         deployment.getSpec().setFlinkVersion(flinkVersion);
         deployment
                 .getSpec()
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
index 753a241e..83fb7624 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
@@ -45,6 +45,7 @@ import java.time.Duration;
 import java.util.LinkedList;
 import java.util.Map;
 
+import static org.apache.flink.api.common.JobStatus.RUNNING;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -110,7 +111,7 @@ public class RollbackTest {
                     testController.reconcile(dep, context);
                 },
                 () -> {
-                    assertEquals("RUNNING", 
dep.getStatus().getJobStatus().getState());
+                    assertEquals(RUNNING, 
dep.getStatus().getJobStatus().getState());
                     assertEquals(1, flinkService.listJobs().size());
                     dep.getSpec().setRestartNonce(10L);
                     testController.reconcile(dep, context);
@@ -154,7 +155,7 @@ public class RollbackTest {
                     testController.reconcile(dep, context);
                 },
                 () -> {
-                    assertEquals("RUNNING", 
dep.getStatus().getJobStatus().getState());
+                    assertEquals(RUNNING, 
dep.getStatus().getJobStatus().getState());
                     assertEquals(1, flinkService.listJobs().size());
                     dep.getSpec().setRestartNonce(10L);
                     testController.reconcile(dep, context);
@@ -243,7 +244,7 @@ public class RollbackTest {
                     testController.reconcile(dep, context);
                 },
                 () -> {
-                    assertEquals("RUNNING", 
dep.getStatus().getJobStatus().getState());
+                    assertEquals(RUNNING, 
dep.getStatus().getJobStatus().getState());
                     assertEquals(1, flinkService.listJobs().size());
 
                     // Trigger deployment recovery
@@ -316,7 +317,7 @@ public class RollbackTest {
                     testController.reconcile(dep, context);
                 },
                 () -> {
-                    assertEquals("RUNNING", 
dep.getStatus().getJobStatus().getState());
+                    assertEquals(RUNNING, 
dep.getStatus().getJobStatus().getState());
                     // Make sure we started from empty state even if savepoint 
was available
                     assertNull(new 
LinkedList<>(flinkService.listJobs()).getLast().f0);
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
index c7b26d1b..e19089c5 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
@@ -36,6 +36,7 @@ import org.junit.jupiter.params.provider.MethodSource;
 
 import java.time.Duration;
 
+import static org.apache.flink.api.common.JobStatus.RUNNING;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED;
@@ -95,7 +96,7 @@ public class UnhealthyDeploymentRestartTest {
         assertEquals(
                 JobManagerDeploymentStatus.READY,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
-        assertEquals("RUNNING", 
appCluster.getStatus().getJobStatus().getState());
+        assertEquals(RUNNING, 
appCluster.getStatus().getJobStatus().getState());
 
         // Make deployment unhealthy
         flinkService.setMetricValue(NUM_RESTARTS_METRIC_NAME, "100");
@@ -111,7 +112,7 @@ public class UnhealthyDeploymentRestartTest {
         assertEquals(
                 JobManagerDeploymentStatus.READY,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
-        assertEquals("RUNNING", 
appCluster.getStatus().getJobStatus().getState());
+        assertEquals(RUNNING, 
appCluster.getStatus().getJobStatus().getState());
     }
 
     @ParameterizedTest
@@ -129,7 +130,7 @@ public class UnhealthyDeploymentRestartTest {
         assertEquals(
                 JobManagerDeploymentStatus.READY,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
-        assertEquals("RUNNING", 
appCluster.getStatus().getJobStatus().getState());
+        assertEquals(RUNNING, 
appCluster.getStatus().getJobStatus().getState());
 
         // Make deployment unhealthy
         
flinkService.setMetricValue(NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME, "1");
@@ -153,6 +154,6 @@ public class UnhealthyDeploymentRestartTest {
         assertEquals(
                 JobManagerDeploymentStatus.READY,
                 appCluster.getStatus().getJobManagerDeploymentStatus());
-        assertEquals("RUNNING", 
appCluster.getStatus().getJobStatus().getState());
+        assertEquals(RUNNING, 
appCluster.getStatus().getJobStatus().getState());
     }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
index 2620fb9b..a399d871 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
@@ -77,7 +77,7 @@ public class ResourceLifecycleMetricsTest {
         application.getStatus().setError("errr");
         assertEquals(STABLE, application.getStatus().getLifecycleState());
 
-        
application.getStatus().getJobStatus().setState(JobStatus.FAILED.name());
+        application.getStatus().getJobStatus().setState(JobStatus.FAILED);
         assertEquals(FAILED, application.getStatus().getLifecycleState());
 
         application.getStatus().setError(null);
@@ -88,14 +88,14 @@ public class ResourceLifecycleMetricsTest {
                 .setState(ReconciliationState.ROLLING_BACK);
         assertEquals(ROLLING_BACK, 
application.getStatus().getLifecycleState());
 
-        
application.getStatus().getJobStatus().setState(JobStatus.RECONCILING.name());
+        application.getStatus().getJobStatus().setState(JobStatus.RECONCILING);
         
application.getStatus().getReconciliationStatus().setState(ReconciliationState.ROLLED_BACK);
         assertEquals(ROLLED_BACK, application.getStatus().getLifecycleState());
 
-        
application.getStatus().getJobStatus().setState(JobStatus.FAILED.name());
+        application.getStatus().getJobStatus().setState(JobStatus.FAILED);
         assertEquals(FAILED, application.getStatus().getLifecycleState());
 
-        
application.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
+        application.getStatus().getJobStatus().setState(JobStatus.RUNNING);
         application.getSpec().getJob().setState(JobState.SUSPENDED);
         ReconciliationUtils.updateStatusForDeployedSpec(application, new 
Configuration());
         assertEquals(SUSPENDED, application.getStatus().getLifecycleState());
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
index f5b38887..512e9f7c 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
@@ -63,7 +63,7 @@ public class JobStatusObserverTest extends OperatorTestBase {
         job.getSpec().getJob().setUpgradeMode(upgradeMode);
         var status = job.getStatus();
         var jobStatus = status.getJobStatus();
-        jobStatus.setState(fromStatus.name());
+        jobStatus.setState(fromStatus);
         assertEquals(
                 JobState.RUNNING,
                 status.getReconciliationStatus()
@@ -91,7 +91,7 @@ public class JobStatusObserverTest extends OperatorTestBase {
         var deployment = initDeployment();
         var status = deployment.getStatus();
         var jobStatus = status.getJobStatus();
-        jobStatus.setState(fromStatus.name());
+        jobStatus.setState(fromStatus);
         assertEquals(
                 JobState.RUNNING,
                 status.getReconciliationStatus()
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverLegacyTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverLegacyTest.java
index fdbfb877..0eee4712 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverLegacyTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverLegacyTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.observer;
 
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.kubernetes.operator.OperatorTestBase;
@@ -328,7 +329,7 @@ public class SnapshotObserverLegacyTest extends 
OperatorTestBase {
         var jobStatus = status.getJobStatus();
         status.getReconciliationStatus()
                 .serializeAndSetLastReconciledSpec(deployment.getSpec(), 
deployment);
-        jobStatus.setState("RUNNING");
+        jobStatus.setState(JobStatus.RUNNING);
 
         var savepointInfo = jobStatus.getSavepointInfo();
         flinkService.triggerSavepointLegacy(null, 
SnapshotTriggerType.PERIODIC, deployment, conf);
@@ -364,7 +365,7 @@ public class SnapshotObserverLegacyTest extends 
OperatorTestBase {
         var jobStatus = status.getJobStatus();
         status.getReconciliationStatus()
                 .serializeAndSetLastReconciledSpec(deployment.getSpec(), 
deployment);
-        jobStatus.setState("RUNNING");
+        jobStatus.setState(JobStatus.RUNNING);
 
         var checkpointInfo = jobStatus.getCheckpointInfo();
         var triggerId = flinkService.triggerCheckpoint(null, 
CheckpointType.FULL, conf);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index 2f06504b..b9adee62 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -128,7 +128,9 @@ public class ApplicationObserverTest extends 
OperatorTestBase {
         assertEquals(
                 JobManagerDeploymentStatus.READY,
                 deployment.getStatus().getJobManagerDeploymentStatus());
-        assertEquals(JobState.RUNNING.name(), 
deployment.getStatus().getJobStatus().getState());
+        assertEquals(
+                org.apache.flink.api.common.JobStatus.RUNNING,
+                deployment.getStatus().getJobStatus().getState());
         assertEquals(
                 
deployment.getStatus().getReconciliationStatus().getLastReconciledSpec(),
                 
deployment.getStatus().getReconciliationStatus().getLastStableSpec());
@@ -137,7 +139,9 @@ public class ApplicationObserverTest extends 
OperatorTestBase {
         assertEquals(
                 JobManagerDeploymentStatus.READY,
                 deployment.getStatus().getJobManagerDeploymentStatus());
-        assertEquals(JobState.RUNNING.name(), 
deployment.getStatus().getJobStatus().getState());
+        assertEquals(
+                org.apache.flink.api.common.JobStatus.RUNNING,
+                deployment.getStatus().getJobStatus().getState());
 
         assertEquals(
                 deployment.getMetadata().getName(),
@@ -177,7 +181,7 @@ public class ApplicationObserverTest extends 
OperatorTestBase {
                 JobManagerDeploymentStatus.READY,
                 deployment.getStatus().getJobManagerDeploymentStatus());
         assertEquals(
-                org.apache.flink.api.common.JobStatus.RECONCILING.name(),
+                org.apache.flink.api.common.JobStatus.RECONCILING,
                 deployment.getStatus().getJobStatus().getState());
         
assertNull(deployment.getStatus().getReconciliationStatus().getLastStableSpec());
     }
@@ -488,7 +492,7 @@ public class ApplicationObserverTest extends 
OperatorTestBase {
 
         observer.observe(deployment, readyContext);
         assertEquals(
-                org.apache.flink.api.common.JobStatus.FAILED.name(),
+                org.apache.flink.api.common.JobStatus.FAILED,
                 deployment.getStatus().getJobStatus().getState());
         assertEquals("last-SP", 
deployment.getStatus().getJobStatus().getUpgradeSavepointPath());
         
assertFalse(SnapshotUtils.savepointInProgress(deployment.getStatus().getJobStatus()));
@@ -653,8 +657,7 @@ public class ApplicationObserverTest extends 
OperatorTestBase {
 
         observer.observe(deployment, readyContext);
         assertEquals(
-                org.apache.flink.api.common.JobStatus.FAILED.name(),
-                getJobStatus(deployment).getState());
+                org.apache.flink.api.common.JobStatus.FAILED, 
getJobStatus(deployment).getState());
         
assertFalse(SnapshotUtils.checkpointInProgress(getJobStatus(deployment)));
     }
 
@@ -745,7 +748,7 @@ public class ApplicationObserverTest extends 
OperatorTestBase {
         ReconciliationUtils.updateStatusForDeployedSpec(deployment, new 
Configuration());
         JobStatus jobStatus = deployment.getStatus().getJobStatus();
         jobStatus.setJobName("jobname");
-        jobStatus.setState(JobState.RUNNING.name());
+        jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING);
         deployment.getStatus().setJobStatus(jobStatus);
         
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
     }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
index 1b6bdb0b..530d3584 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
@@ -85,12 +85,12 @@ public class FlinkSessionJobObserverTest extends 
OperatorTestBase {
         var jobID = sessionJob.getStatus().getJobStatus().getJobId();
         Assertions.assertNotNull(jobID);
         Assertions.assertEquals(
-                JobStatus.RECONCILING.name(), 
sessionJob.getStatus().getJobStatus().getState());
+                JobStatus.RECONCILING, 
sessionJob.getStatus().getJobStatus().getState());
 
         // observe with empty context will do nothing
         observer.observe(sessionJob, TestUtils.createEmptyContext());
         Assertions.assertEquals(
-                JobStatus.RECONCILING.name(), 
sessionJob.getStatus().getJobStatus().getState());
+                JobStatus.RECONCILING, 
sessionJob.getStatus().getJobStatus().getState());
 
         var reconStatus = sessionJob.getStatus().getReconciliationStatus();
         Assertions.assertNotEquals(
@@ -99,22 +99,22 @@ public class FlinkSessionJobObserverTest extends 
OperatorTestBase {
         // observe with ready context
         observer.observe(sessionJob, readyContext);
         Assertions.assertEquals(
-                JobStatus.RUNNING.name(), 
sessionJob.getStatus().getJobStatus().getState());
+                JobStatus.RUNNING, 
sessionJob.getStatus().getJobStatus().getState());
         Assertions.assertEquals(
                 reconStatus.getLastReconciledSpec(), 
reconStatus.getLastStableSpec());
 
         flinkService.setPortReady(false);
         observer.observe(sessionJob, readyContext);
         Assertions.assertEquals(
-                JobStatus.RECONCILING.name(), 
sessionJob.getStatus().getJobStatus().getState());
+                JobStatus.RECONCILING, 
sessionJob.getStatus().getJobStatus().getState());
 
-        
sessionJob.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
+        sessionJob.getStatus().getJobStatus().setState(JobStatus.RUNNING);
         // no matched job id, update the state to unknown
         flinkService.setPortReady(true);
         sessionJob.getStatus().getJobStatus().setJobId(new 
JobID().toHexString());
         observer.observe(sessionJob, readyContext);
         Assertions.assertEquals(
-                JobStatus.RECONCILING.name(), 
sessionJob.getStatus().getJobStatus().getState());
+                JobStatus.RECONCILING, 
sessionJob.getStatus().getJobStatus().getState());
         Assertions.assertEquals(
                 JobState.SUSPENDED,
                 sessionJob
@@ -138,13 +138,13 @@ public class FlinkSessionJobObserverTest extends 
OperatorTestBase {
         Assertions.assertNotNull(jobID);
         Assertions.assertNotEquals(jobID, jobID2);
         Assertions.assertEquals(
-                JobStatus.RECONCILING.name(), 
sessionJob.getStatus().getJobStatus().getState());
+                JobStatus.RECONCILING, 
sessionJob.getStatus().getJobStatus().getState());
         observer.observe(sessionJob2, readyContext);
         Assertions.assertEquals(
-                JobStatus.RUNNING.name(), 
sessionJob2.getStatus().getJobStatus().getState());
+                JobStatus.RUNNING, 
sessionJob2.getStatus().getJobStatus().getState());
         observer.observe(sessionJob, readyContext);
         Assertions.assertEquals(
-                JobStatus.RUNNING.name(), 
sessionJob.getStatus().getJobStatus().getState());
+                JobStatus.RUNNING, 
sessionJob.getStatus().getJobStatus().getState());
 
         // test error behaviour if job not present
         flinkService.clear();
@@ -153,7 +153,7 @@ public class FlinkSessionJobObserverTest extends 
OperatorTestBase {
 
         observer.observe(sessionJob2, readyContext);
         Assertions.assertEquals(
-                JobStatus.RECONCILING.name(), 
sessionJob2.getStatus().getJobStatus().getState());
+                JobStatus.RECONCILING, 
sessionJob2.getStatus().getJobStatus().getState());
         Assertions.assertTrue(
                 
sessionJob2.getStatus().getError().contains(JobStatusObserver.JOB_NOT_FOUND_ERR));
         Assertions.assertEquals(
@@ -173,14 +173,14 @@ public class FlinkSessionJobObserverTest extends 
OperatorTestBase {
         var jobID = sessionJob.getStatus().getJobStatus().getJobId();
         Assertions.assertNotNull(jobID);
         Assertions.assertEquals(
-                JobStatus.RECONCILING.name(), 
sessionJob.getStatus().getJobStatus().getState());
+                JobStatus.RECONCILING, 
sessionJob.getStatus().getJobStatus().getState());
 
         flinkService.setListJobConsumer(
                 (configuration) ->
                         Assertions.assertEquals(8088, 
configuration.getInteger(RestOptions.PORT)));
         observer.observe(sessionJob, readyContext);
         Assertions.assertEquals(
-                JobStatus.RUNNING.name(), 
sessionJob.getStatus().getJobStatus().getState());
+                JobStatus.RUNNING, 
sessionJob.getStatus().getJobStatus().getState());
     }
 
     @Test
@@ -192,11 +192,11 @@ public class FlinkSessionJobObserverTest extends 
OperatorTestBase {
         var jobID = sessionJob.getStatus().getJobStatus().getJobId();
         Assertions.assertNotNull(jobID);
         Assertions.assertEquals(
-                JobStatus.RECONCILING.name(), 
sessionJob.getStatus().getJobStatus().getState());
+                JobStatus.RECONCILING, 
sessionJob.getStatus().getJobStatus().getState());
 
         observer.observe(sessionJob, readyContext);
         Assertions.assertEquals(
-                JobStatus.RUNNING.name(), 
sessionJob.getStatus().getJobStatus().getState());
+                JobStatus.RUNNING, 
sessionJob.getStatus().getJobStatus().getState());
 
         var savepointInfo = 
sessionJob.getStatus().getJobStatus().getSavepointInfo();
         Assertions.assertFalse(
@@ -238,10 +238,10 @@ public class FlinkSessionJobObserverTest extends 
OperatorTestBase {
         var jobID = sessionJob.getStatus().getJobStatus().getJobId();
         Assertions.assertNotNull(jobID);
         Assertions.assertEquals(
-                JobStatus.RECONCILING.name(), 
sessionJob.getStatus().getJobStatus().getState());
+                JobStatus.RECONCILING, 
sessionJob.getStatus().getJobStatus().getState());
 
         observer.observe(sessionJob, readyContext);
-        assertEquals(JobStatus.RUNNING.name(), 
sessionJob.getStatus().getJobStatus().getState());
+        assertEquals(JobStatus.RUNNING, 
sessionJob.getStatus().getJobStatus().getState());
 
         var checkpointInfo = 
sessionJob.getStatus().getJobStatus().getCheckpointInfo();
         
assertFalse(SnapshotUtils.checkpointInProgress(sessionJob.getStatus().getJobStatus()));
@@ -280,7 +280,7 @@ public class FlinkSessionJobObserverTest extends 
OperatorTestBase {
     @ValueSource(booleans = {true, false})
     public void testObserveAlreadySubmitted(boolean submitted) {
         var sessionJob = TestUtils.buildSessionJob();
-        
sessionJob.getStatus().getJobStatus().setState(JobStatus.RECONCILING.name());
+        sessionJob.getStatus().getJobStatus().setState(JobStatus.RECONCILING);
         sessionJob.getMetadata().setGeneration(10L);
         var readyContext = 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
 
@@ -305,7 +305,7 @@ public class FlinkSessionJobObserverTest extends 
OperatorTestBase {
                 submitted ? ReconciliationState.DEPLOYED : 
ReconciliationState.UPGRADING,
                 sessionJob.getStatus().getReconciliationStatus().getState());
         Assertions.assertEquals(
-                submitted ? JobStatus.RUNNING.name() : 
JobStatus.RECONCILING.name(),
+                submitted ? JobStatus.RUNNING : JobStatus.RECONCILING,
                 sessionJob.getStatus().getJobStatus().getState());
     }
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index e00305d3..8bf19ff1 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -109,6 +109,9 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
 
+import static org.apache.flink.api.common.JobStatus.FINISHED;
+import static org.apache.flink.api.common.JobStatus.RECONCILING;
+import static org.apache.flink.api.common.JobStatus.RUNNING;
 import static 
org.apache.flink.kubernetes.operator.api.utils.FlinkResourceUtils.getCheckpointInfo;
 import static 
org.apache.flink.kubernetes.operator.api.utils.FlinkResourceUtils.getJobSpec;
 import static 
org.apache.flink.kubernetes.operator.api.utils.FlinkResourceUtils.getJobStatus;
@@ -321,7 +324,7 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
                 .setLastStableSpec(
                         
deployment.getStatus().getReconciliationStatus().getLastReconciledSpec());
         flinkService.setHaDataAvailable(false);
-        getJobStatus(deployment).setState("RECONCILING");
+        getJobStatus(deployment).setState(RECONCILING);
 
         try {
             deployment
@@ -343,7 +346,7 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
         getJobSpec(deployment).setUpgradeMode(UpgradeMode.LAST_STATE);
         deployment.getSpec().setRestartNonce(200L);
         flinkService.setHaDataAvailable(false);
-        getJobStatus(deployment).setState("FINISHED");
+        getJobStatus(deployment).setState(FINISHED);
         
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
         reconciler.reconcile(deployment, context);
         reconciler.reconcile(deployment, context);
@@ -693,7 +696,7 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
                                         
.jobId(runningJobs.get(0).f1.getJobId().toHexString())
                                         
.jobName(runningJobs.get(0).f1.getJobName())
                                         
.updateTime(Long.toString(System.currentTimeMillis()))
-                                        .state("RUNNING")
+                                        .state(RUNNING)
                                         .build());
         
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
     }
@@ -711,7 +714,9 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
         
getJobSpec(spDeployment).setSavepointTriggerNonce(ThreadLocalRandom.current().nextLong());
         reconciler.reconcile(spDeployment, context);
         assertEquals("savepoint_trigger_0", 
getSavepointInfo(spDeployment).getTriggerId());
-        assertEquals(JobState.RUNNING.name(), 
getJobStatus(spDeployment).getState());
+        assertEquals(
+                org.apache.flink.api.common.JobStatus.RUNNING,
+                getJobStatus(spDeployment).getState());
 
         // Force upgrade when savepoint is in progress.
         spDeployment
@@ -724,7 +729,7 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
         reconciler.reconcile(spDeployment, context);
         assertEquals("savepoint_trigger_0", 
getSavepointInfo(spDeployment).getTriggerId());
         assertEquals(
-                org.apache.flink.api.common.JobStatus.FINISHED.name(),
+                org.apache.flink.api.common.JobStatus.FINISHED,
                 getJobStatus(spDeployment).getState());
     }
 
@@ -741,7 +746,7 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
         FlinkDeploymentSpec spec = flinkApp.getSpec();
         Configuration deployConfig = configManager.getDeployConfig(deployMeta, 
spec);
 
-        
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
+        
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED);
         status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
         reconciler
                 .getReconciler()
@@ -750,7 +755,7 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
         String path1 = deployConfig.get(JobResultStoreOptions.STORAGE_PATH);
         Assertions.assertTrue(path1.startsWith(haStoragePath));
 
-        
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
+        
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED);
         status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
         reconciler
                 .getReconciler()
@@ -775,7 +780,7 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
         reconciler.reconcile(deployment, context);
         assertEquals(ReconciliationState.DEPLOYED, reconStatus.getState());
 
-        getJobStatus(deployment).setState(JobState.RUNNING.name());
+        getJobStatus(deployment).setState(RUNNING);
         getJobStatus(deployment)
                 
.setJobId(flinkService.listJobs().get(0).f1.getJobId().toHexString());
 
@@ -951,7 +956,7 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
         assertEquals(
                 ReconciliationState.DEPLOYED,
                 deployment.getStatus().getReconciliationStatus().getState());
-        assertEquals("RUNNING", 
deployment.getStatus().getJobStatus().getState());
+        assertEquals(RUNNING, 
deployment.getStatus().getJobStatus().getState());
 
         // Test overrides are applied correctly
         var v1 = new JobVertexID();
@@ -998,7 +1003,7 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
         FlinkDeploymentSpec spec = flinkApp.getSpec();
         Configuration deployConfig = configManager.getDeployConfig(deployMeta, 
spec);
 
-        
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
+        
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED);
         status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
         reconciler
                 .getReconciler()
@@ -1022,12 +1027,20 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
 
     @Test
     public void testTerminalJmTtlOnFinished() throws Throwable {
-        testTerminalJmTtl(dep -> 
dep.getStatus().getJobStatus().setState("FINISHED"));
+        testTerminalJmTtl(
+                dep ->
+                        dep.getStatus()
+                                .getJobStatus()
+                                
.setState(org.apache.flink.api.common.JobStatus.FINISHED));
     }
 
     @Test
     public void testTerminalJmTtlOnFailed() throws Throwable {
-        testTerminalJmTtl(dep -> 
dep.getStatus().getJobStatus().setState("FAILED"));
+        testTerminalJmTtl(
+                dep ->
+                        dep.getStatus()
+                                .getJobStatus()
+                                
.setState(org.apache.flink.api.common.JobStatus.FAILED));
     }
 
     public void testTerminalJmTtl(ThrowingConsumer<FlinkDeployment> 
deploymentSetup)
@@ -1076,7 +1089,7 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
 
         
status.getReconciliationStatus().serializeAndSetLastReconciledSpec(spec, 
flinkApp);
         status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
-        
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
+        
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED);
 
         var deleted = new AtomicBoolean(false);
 
@@ -1112,9 +1125,7 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
         flinkService.clear();
         FlinkDeploymentStatus deploymentStatus = deployment.getStatus();
         
deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
-        deploymentStatus
-                .getJobStatus()
-                
.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+        deploymentStatus.getJobStatus().setState(RECONCILING);
         reconciler.reconcile(deployment, context);
         Assertions.assertEquals(
                 MSG_RECOVERY, 
flinkResourceEventCollector.events.remove().getMessage());
@@ -1298,7 +1309,7 @@ public class ApplicationReconcilerTest extends 
OperatorTestBase {
                 ReconciliationState.ROLLED_BACK,
                 deployment.getStatus().getReconciliationStatus().getState());
         assertEquals(1, flinkService.listJobs().size());
-        assertEquals("RECONCILING", 
deployment.getStatus().getJobStatus().getState());
+        assertEquals(RECONCILING, 
deployment.getStatus().getJobStatus().getState());
     }
 
     @ParameterizedTest
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
index 93a25e82..9d489302 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
@@ -63,6 +63,12 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Stream;
 
+import static org.apache.flink.api.common.JobStatus.CANCELLING;
+import static org.apache.flink.api.common.JobStatus.FAILING;
+import static org.apache.flink.api.common.JobStatus.FINISHED;
+import static org.apache.flink.api.common.JobStatus.RECONCILING;
+import static org.apache.flink.api.common.JobStatus.RESTARTING;
+import static org.apache.flink.api.common.JobStatus.RUNNING;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -210,7 +216,10 @@ public class ApplicationReconcilerUpgradeModeTest extends 
OperatorTestBase {
                 .setLastStableSpec(
                         
deployment.getStatus().getReconciliationStatus().getLastReconciledSpec());
         flinkService.setHaDataAvailable(false);
-        deployment.getStatus().getJobStatus().setState("RECONCILING");
+        deployment
+                .getStatus()
+                .getJobStatus()
+                .setState(org.apache.flink.api.common.JobStatus.RECONCILING);
 
         Assertions.assertThrows(
                 UpgradeFailureException.class,
@@ -236,7 +245,10 @@ public class ApplicationReconcilerUpgradeModeTest extends 
OperatorTestBase {
         deployment.getSpec().setRestartNonce(200L);
         flinkService.setHaDataAvailable(false);
         
deployment.getStatus().getJobStatus().setUpgradeSavepointPath("finished_sp");
-        deployment.getStatus().getJobStatus().setState("FINISHED");
+        deployment
+                .getStatus()
+                .getJobStatus()
+                .setState(org.apache.flink.api.common.JobStatus.FINISHED);
         
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
         deployment
                 .getSpec()
@@ -287,7 +299,7 @@ public class ApplicationReconcilerUpgradeModeTest extends 
OperatorTestBase {
                                     0L));
         }
 
-        deployment.getStatus().getJobStatus().setState("FINISHED");
+        deployment.getStatus().getJobStatus().setState(FINISHED);
         reconciler.reconcile(deployment, context);
         reconciler.reconcile(deployment, context);
 
@@ -503,7 +515,7 @@ public class ApplicationReconcilerUpgradeModeTest extends 
OperatorTestBase {
         var jobStatus = deployment.getStatus().getJobStatus();
         long now = System.currentTimeMillis();
 
-        jobStatus.setState("RUNNING");
+        jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING);
         jobStatus.setStartTime(Long.toString(now));
         jobStatus.setJobId(new JobID().toString());
 
@@ -626,7 +638,7 @@ public class ApplicationReconcilerUpgradeModeTest extends 
OperatorTestBase {
         jobStatus.setJobId(new JobID().toString());
 
         // Running state, savepoint if possible
-        
jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
+        jobStatus.setState(RUNNING);
         var ctx = getResourceContext(deployment);
         var deployConf = ctx.getDeployConfig(deployment.getSpec());
 
@@ -637,13 +649,13 @@ public class ApplicationReconcilerUpgradeModeTest extends 
OperatorTestBase {
                 jobReconciler.getJobUpgrade(ctx, deployConf));
 
         // Not running (but cancellable)
-        
jobStatus.setState(org.apache.flink.api.common.JobStatus.RESTARTING.name());
+        jobStatus.setState(RESTARTING);
         assertEquals(
                 AbstractJobReconciler.JobUpgrade.lastStateUsingCancel(),
                 jobReconciler.getJobUpgrade(ctx, deployConf));
 
         // Unknown / reconciling
-        
jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+        jobStatus.setState(RECONCILING);
         assertEquals(
                 AbstractJobReconciler.JobUpgrade.pendingUpgrade(),
                 jobReconciler.getJobUpgrade(ctx, deployConf));
@@ -690,7 +702,7 @@ public class ApplicationReconcilerUpgradeModeTest extends 
OperatorTestBase {
         jobStatus.setJobId(new JobID().toString());
 
         // Running state, savepoint if possible
-        
jobStatus.setState(org.apache.flink.api.common.JobStatus.FAILING.name());
+        jobStatus.setState(FAILING);
         var ctx = getResourceContext(deployment);
         var deployConf = ctx.getDeployConfig(deployment.getSpec());
 
@@ -798,7 +810,7 @@ public class ApplicationReconcilerUpgradeModeTest extends 
OperatorTestBase {
         reconciler.reconcile(deployment, context);
         assertEquals(0, flinkService.getRunningCount());
         assertEquals(
-                org.apache.flink.api.common.JobStatus.FINISHED.name(),
+                org.apache.flink.api.common.JobStatus.FINISHED,
                 deployment.getStatus().getJobStatus().getState());
 
         var snapshots = 
TestUtils.getFlinkStateSnapshotsForResource(kubernetesClient, deployment);
@@ -845,11 +857,11 @@ public class ApplicationReconcilerUpgradeModeTest extends 
OperatorTestBase {
         // Ready for spec changes, the reconciliation should be performed
         verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
         reconciler.reconcile(deployment, context);
-        assertEquals("CANCELLING", 
deployment.getStatus().getJobStatus().getState());
+        assertEquals(CANCELLING, 
deployment.getStatus().getJobStatus().getState());
 
         String expectedSavepointPath = "savepoint_0";
         var jobStatus = deployment.getStatus().getJobStatus();
-        jobStatus.setState("CANCELED");
+        jobStatus.setState(org.apache.flink.api.common.JobStatus.CANCELED);
         jobStatus
                 .getSavepointInfo()
                 .setLastSavepoint(Savepoint.of(expectedSavepointPath, 
SnapshotTriggerType.UNKNOWN));
@@ -947,7 +959,7 @@ public class ApplicationReconcilerUpgradeModeTest extends 
OperatorTestBase {
                                         
.jobId(runningJobs.get(0).f1.getJobId().toHexString())
                                         
.jobName(runningJobs.get(0).f1.getJobName())
                                         
.startTime(Long.toString(System.currentTimeMillis()))
-                                        .state("RUNNING")
+                                        
.state(org.apache.flink.api.common.JobStatus.RUNNING)
                                         .build());
         
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
     }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
index 87f44b49..54484065 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
@@ -127,7 +127,7 @@ public class SessionReconcilerTest extends OperatorTestBase 
{
         FlinkDeploymentSpec spec = flinkApp.getSpec();
         Configuration deployConfig = configManager.getDeployConfig(deployMeta, 
spec);
 
-        
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
+        
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED);
         status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
         reconciler
                 .getReconciler()
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
index 0185b765..c72b5082 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
@@ -122,7 +122,7 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
                 sessionJob, 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
         assertEquals(1, flinkService.listJobs().size());
         verifyAndSetRunningJobsToStatus(
-                sessionJob, JobState.RUNNING, RECONCILING.name(), null, 
flinkService.listJobs());
+                sessionJob, JobState.RUNNING, RECONCILING, null, 
flinkService.listJobs());
 
         // clean up
         reconciler.cleanup(
@@ -165,7 +165,7 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
                 sessionJob, 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
         assertEquals(1, flinkService.listJobs().size());
         verifyAndSetRunningJobsToStatus(
-                sessionJob, JobState.RUNNING, RECONCILING.name(), null, 
flinkService.listJobs());
+                sessionJob, JobState.RUNNING, RECONCILING, null, 
flinkService.listJobs());
 
         // clean up
         reconciler.cleanup(
@@ -201,7 +201,7 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
                 sessionJob, 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
         assertEquals(1, flinkService.listJobs().size());
         verifyAndSetRunningJobsToStatus(
-                sessionJob, JobState.RUNNING, RECONCILING.name(), null, 
flinkService.listJobs());
+                sessionJob, JobState.RUNNING, RECONCILING, null, 
flinkService.listJobs());
         // clean up
         reconciler.cleanup(
                 sessionJob, 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
@@ -217,7 +217,7 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
                 sessionJob, 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
         assertEquals(1, flinkService.listJobs().size());
         verifyAndSetRunningJobsToStatus(
-                sessionJob, JobState.RUNNING, RECONCILING.name(), null, 
flinkService.listJobs());
+                sessionJob, JobState.RUNNING, RECONCILING, null, 
flinkService.listJobs());
         // clean up
         flinkService.setPortReady(false);
         var deleteControl =
@@ -230,7 +230,10 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
         flinkService.setPortReady(true);
         reconciler.cleanup(
                 sessionJob, 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
-        sessionJob.getStatus().getJobStatus().setState("CANCELED");
+        sessionJob
+                .getStatus()
+                .getJobStatus()
+                .setState(org.apache.flink.api.common.JobStatus.CANCELED);
         deleteControl =
                 reconciler.cleanup(
                         sessionJob,
@@ -248,14 +251,17 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
                 sessionJob, 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
         assertEquals(1, flinkService.listJobs().size());
         verifyAndSetRunningJobsToStatus(
-                sessionJob, JobState.RUNNING, RECONCILING.name(), null, 
flinkService.listJobs());
+                sessionJob, JobState.RUNNING, RECONCILING, null, 
flinkService.listJobs());
         // clean up
         flinkService.setFlinkJobTerminatedWithoutCancellation(true);
         var deleteControl =
                 reconciler.cleanup(
                         sessionJob,
                         
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
-        sessionJob.getStatus().getJobStatus().setState("CANCELED");
+        sessionJob
+                .getStatus()
+                .getJobStatus()
+                .setState(org.apache.flink.api.common.JobStatus.CANCELED);
 
         deleteControl =
                 reconciler.cleanup(
@@ -273,16 +279,19 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
                 sessionJob, 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
         assertEquals(1, flinkService.listJobs().size());
         verifyAndSetRunningJobsToStatus(
-                sessionJob, JobState.RUNNING, RECONCILING.name(), null, 
flinkService.listJobs());
+                sessionJob, JobState.RUNNING, RECONCILING, null, 
flinkService.listJobs());
         sessionJob.getSpec().setRestartNonce(2L);
         reconciler.reconcile(
                 sessionJob, 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
         assertEquals(CANCELED, 
flinkService.listJobs().get(0).f1.getJobState());
-        sessionJob.getStatus().getJobStatus().setState("CANCELED");
+        sessionJob
+                .getStatus()
+                .getJobStatus()
+                .setState(org.apache.flink.api.common.JobStatus.CANCELED);
         reconciler.reconcile(
                 sessionJob, 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
         verifyAndSetRunningJobsToStatus(
-                sessionJob, JobState.RUNNING, RECONCILING.name(), null, 
flinkService.listJobs());
+                sessionJob, JobState.RUNNING, RECONCILING, null, 
flinkService.listJobs());
     }
 
     @Test
@@ -294,9 +303,9 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
         reconciler.reconcile(sessionJob, readyContext);
         assertEquals(1, flinkService.listJobs().size());
         verifyAndSetRunningJobsToStatus(
-                sessionJob, JobState.RUNNING, RECONCILING.name(), null, 
flinkService.listJobs());
+                sessionJob, JobState.RUNNING, RECONCILING, null, 
flinkService.listJobs());
 
-        sessionJob.getStatus().getJobStatus().setState(FAILED.name());
+        sessionJob.getStatus().getJobStatus().setState(FAILED);
         reconciler.reconcile(sessionJob, readyContext);
         assertEquals(2, flinkService.listJobs().size());
         assertEquals(RUNNING, flinkService.listJobs().get(1).f1.getJobState());
@@ -313,7 +322,7 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
         verifyAndSetRunningJobsToStatus(
                 sessionJob,
                 JobState.RUNNING,
-                RECONCILING.name(),
+                RECONCILING,
                 initSavepointPath,
                 flinkService.listJobs());
     }
@@ -326,7 +335,7 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
         reconciler.reconcile(sessionJob, readyContext);
         assertEquals(1, flinkService.listJobs().size());
         verifyAndSetRunningJobsToStatus(
-                sessionJob, JobState.RUNNING, RECONCILING.name(), null, 
flinkService.listJobs());
+                sessionJob, JobState.RUNNING, RECONCILING, null, 
flinkService.listJobs());
 
         var statelessSessionJob = ReconciliationUtils.clone(sessionJob);
         
statelessSessionJob.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
@@ -334,18 +343,20 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
         // job suspended first
         reconciler.reconcile(statelessSessionJob, readyContext);
         assertEquals(CANCELED, 
flinkService.listJobs().get(0).f1.getJobState());
-        verifyJobState(statelessSessionJob, JobState.RUNNING, "CANCELLING");
-        statelessSessionJob.getStatus().getJobStatus().setState("CANCELED");
+        verifyJobState(
+                statelessSessionJob,
+                JobState.RUNNING,
+                org.apache.flink.api.common.JobStatus.CANCELLING);
+        statelessSessionJob
+                .getStatus()
+                .getJobStatus()
+                .setState(org.apache.flink.api.common.JobStatus.CANCELED);
 
         flinkService.clear();
         reconciler.reconcile(statelessSessionJob, readyContext);
         assertEquals(1, flinkService.listJobs().size());
         verifyAndSetRunningJobsToStatus(
-                statelessSessionJob,
-                JobState.RUNNING,
-                RECONCILING.name(),
-                null,
-                flinkService.listJobs());
+                statelessSessionJob, JobState.RUNNING, RECONCILING, null, 
flinkService.listJobs());
     }
 
     @ParameterizedTest
@@ -377,17 +388,13 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
         statefulSessionJob.getSpec().getJob().setParallelism(3);
 
         verifyAndSetRunningJobsToStatus(
-                statefulSessionJob,
-                JobState.RUNNING,
-                RECONCILING.name(),
-                null,
-                flinkService.listJobs());
+                statefulSessionJob, JobState.RUNNING, RECONCILING, null, 
flinkService.listJobs());
 
         reconciler.reconcile(statefulSessionJob, readyContext);
 
         // job suspended first
         assertEquals(FINISHED, 
flinkService.listJobs().get(0).f1.getJobState());
-        verifyJobState(statefulSessionJob, JobState.SUSPENDED, "FINISHED");
+        verifyJobState(statefulSessionJob, JobState.SUSPENDED, FINISHED);
         if (legacySnapshots) {
             assertEquals(
                     "savepoint_0",
@@ -416,7 +423,7 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
         verifyAndSetRunningJobsToStatus(
                 statefulSessionJob,
                 JobState.RUNNING,
-                RECONCILING.name(),
+                RECONCILING,
                 "savepoint_0",
                 flinkService.listJobs());
     }
@@ -431,7 +438,7 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
         var readyContext = 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
         reconciler.reconcile(sessionJob, readyContext);
         verifyAndSetRunningJobsToStatus(
-                sessionJob, JobState.RUNNING, RECONCILING.name(), null, 
flinkService.listJobs());
+                sessionJob, JobState.RUNNING, RECONCILING, null, 
flinkService.listJobs());
 
         
assertFalse(SnapshotUtils.savepointInProgress(sessionJob.getStatus().getJobStatus()));
 
@@ -443,12 +450,12 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
         
assertFalse(SnapshotUtils.savepointInProgress(sp1SessionJob.getStatus().getJobStatus()));
 
         sp1SessionJob.getSpec().getJob().setSavepointTriggerNonce(2L);
-        sp1SessionJob.getStatus().getJobStatus().setState(CREATED.name());
+        sp1SessionJob.getStatus().getJobStatus().setState(CREATED);
         reconciler.reconcile(sp1SessionJob, readyContext);
         // do not trigger savepoint if job is not running
         
assertFalse(SnapshotUtils.savepointInProgress(sp1SessionJob.getStatus().getJobStatus()));
 
-        sp1SessionJob.getStatus().getJobStatus().setState(RUNNING.name());
+        sp1SessionJob.getStatus().getJobStatus().setState(RUNNING);
 
         reconciler.reconcile(sp1SessionJob, readyContext);
         
assertTrue(SnapshotUtils.savepointInProgress(sp1SessionJob.getStatus().getJobStatus()));
@@ -505,7 +512,10 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
 
         // running -> suspended
         reconciler.reconcile(sp1SessionJob, readyContext);
-        sp1SessionJob.getStatus().getJobStatus().setState("CANCELED");
+        sp1SessionJob
+                .getStatus()
+                .getJobStatus()
+                .setState(org.apache.flink.api.common.JobStatus.CANCELED);
         // suspended -> running
         reconciler.reconcile(sp1SessionJob, readyContext);
         // parallelism changed
@@ -518,7 +528,7 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
                         .getJob()
                         .getParallelism());
         verifyAndSetRunningJobsToStatus(
-                sp1SessionJob, JobState.RUNNING, RECONCILING.name(), null, 
flinkService.listJobs());
+                sp1SessionJob, JobState.RUNNING, RECONCILING, null, 
flinkService.listJobs());
 
         
sp1SessionJob.getStatus().getJobStatus().getSavepointInfo().resetTrigger();
         ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(
@@ -555,7 +565,7 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
         var readyContext = 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
         reconciler.reconcile(sessionJob, readyContext);
         verifyAndSetRunningJobsToStatus(
-                sessionJob, JobState.RUNNING, RECONCILING.name(), null, 
flinkService.listJobs());
+                sessionJob, JobState.RUNNING, RECONCILING, null, 
flinkService.listJobs());
 
         
assertFalse(SnapshotUtils.checkpointInProgress(getJobStatus(sessionJob)));
 
@@ -567,12 +577,12 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
         
assertFalse(SnapshotUtils.checkpointInProgress(getJobStatus(sp1SessionJob)));
 
         getJobSpec(sp1SessionJob).setCheckpointTriggerNonce(2L);
-        getJobStatus(sp1SessionJob).setState(CREATED.name());
+        getJobStatus(sp1SessionJob).setState(CREATED);
         reconciler.reconcile(sp1SessionJob, readyContext);
         // do not trigger checkpoint if job is not running
         
assertFalse(SnapshotUtils.checkpointInProgress(getJobStatus(sp1SessionJob)));
 
-        getJobStatus(sp1SessionJob).setState(RUNNING.name());
+        getJobStatus(sp1SessionJob).setState(RUNNING);
 
         reconciler.reconcile(sp1SessionJob, readyContext);
         
assertTrue(SnapshotUtils.checkpointInProgress(getJobStatus(sp1SessionJob)));
@@ -618,7 +628,7 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
         reconciler.reconcile(sessionJob, readyContext);
         assertEquals(1, flinkService.listJobs().size());
         verifyAndSetRunningJobsToStatus(
-                sessionJob, JobState.RUNNING, RECONCILING.name(), null, 
flinkService.listJobs());
+                sessionJob, JobState.RUNNING, RECONCILING, null, 
flinkService.listJobs());
 
         var job = flinkService.listJobs().get(0);
         var jobStatusMessage = job.f1;
@@ -634,13 +644,16 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
                         RUNNING,
                         jobStatusMessage.getStartTime());
         // Set state which must be overwritten by cancelSessionJob
-        sessionJob.getStatus().getJobStatus().setState("RUNNING");
+        sessionJob
+                .getStatus()
+                .getJobStatus()
+                .setState(org.apache.flink.api.common.JobStatus.RUNNING);
 
         flinkService.cancelSessionJob(sessionJob, suspendMode, jobConfig);
 
         assertEquals(1, flinkService.getCancelJobCallCount());
         assertEquals(CANCELED, job.f1.getJobState());
-        assertEquals(CANCELLING.name(), 
sessionJob.getStatus().getJobStatus().getState());
+        assertEquals(CANCELLING, 
sessionJob.getStatus().getJobStatus().getState());
     }
 
     private static Stream<Arguments> cancelSavepointSessionJobParams() {
@@ -683,7 +696,7 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
         reconciler.reconcile(sessionJob, readyContext);
         assertEquals(1, flinkService.listJobs().size());
         verifyAndSetRunningJobsToStatus(
-                sessionJob, JobState.RUNNING, RECONCILING.name(), null, 
flinkService.listJobs());
+                sessionJob, JobState.RUNNING, RECONCILING, null, 
flinkService.listJobs());
 
         var job = flinkService.listJobs().get(0);
         var jobStatusMessage = job.f1;
@@ -699,7 +712,7 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
                         fromJobStatus,
                         jobStatusMessage.getStartTime());
         // Set state which must be overwritten by cancelSessionJob
-        sessionJob.getStatus().getJobStatus().setState(fromJobStatus.name());
+        sessionJob.getStatus().getJobStatus().setState(fromJobStatus);
 
         if (!shouldThrowException) {
             flinkService.cancelSessionJob(sessionJob, SuspendMode.SAVEPOINT, 
jobConfig);
@@ -723,7 +736,7 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
             assertEquals(FINISHED, job.f1.getJobState());
         }
         if (!shouldThrowException) {
-            assertEquals(FINISHED.name(), 
sessionJob.getStatus().getJobStatus().getState());
+            assertEquals(FINISHED, 
sessionJob.getStatus().getJobStatus().getState());
         }
     }
 
@@ -740,7 +753,7 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
     private void verifyAndSetRunningJobsToStatus(
             FlinkSessionJob sessionJob,
             JobState expectedState,
-            String jobStatusObserved,
+            org.apache.flink.api.common.JobStatus jobStatusObserved,
             @Nullable String expectedSavepointPath,
             List<Tuple3<String, JobStatusMessage, Configuration>> jobs) {
 
@@ -750,11 +763,13 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
         verifyJobState(sessionJob, expectedState, jobStatusObserved);
         JobStatus jobStatus = sessionJob.getStatus().getJobStatus();
         jobStatus.setJobName(submittedJobInfo.f1.getJobName());
-        jobStatus.setState("RUNNING");
+        jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING);
     }
 
     private void verifyJobState(
-            FlinkSessionJob sessionJob, JobState expectedState, String 
jobStatusObserved) {
+            FlinkSessionJob sessionJob,
+            JobState expectedState,
+            org.apache.flink.api.common.JobStatus jobStatusObserved) {
         assertEquals(
                 expectedState,
                 sessionJob
@@ -773,7 +788,7 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
         FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
         reconciler.reconcile(sessionJob, readyContext);
         verifyAndSetRunningJobsToStatus(
-                sessionJob, JobState.RUNNING, RECONCILING.name(), null, 
flinkService.listJobs());
+                sessionJob, JobState.RUNNING, RECONCILING, null, 
flinkService.listJobs());
 
         FlinkSessionJob spSessionJob = ReconciliationUtils.clone(sessionJob);
         spSessionJob
@@ -788,7 +803,9 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
         assertEquals(
                 "savepoint_trigger_0",
                 
spSessionJob.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
-        assertEquals(JobState.RUNNING.name(), 
spSessionJob.getStatus().getJobStatus().getState());
+        assertEquals(
+                org.apache.flink.api.common.JobStatus.RUNNING,
+                spSessionJob.getStatus().getJobStatus().getState());
 
         configManager.updateDefaultConfig(
                 Configuration.fromMap(
@@ -802,7 +819,7 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
         assertEquals(
                 "savepoint_trigger_0",
                 
spSessionJob.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
-        assertEquals("CANCELLING", 
spSessionJob.getStatus().getJobStatus().getState());
+        assertEquals(CANCELLING, 
spSessionJob.getStatus().getJobStatus().getState());
     }
 
     @Test
@@ -816,8 +833,7 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
                 ReconciliationState.DEPLOYED,
                 sessionJob.getStatus().getReconciliationStatus().getState());
         var jobID = sessionJob.getStatus().getJobStatus().getJobId();
-        Assertions.assertEquals(
-                RECONCILING.name(), 
sessionJob.getStatus().getJobStatus().getState());
+        Assertions.assertEquals(RECONCILING, 
sessionJob.getStatus().getJobStatus().getState());
         Assertions.assertEquals(jobID, 
flinkService.listJobs().get(0).f1.getJobId().toString());
 
         flinkService.setSessionJobSubmittedCallback(
@@ -831,7 +847,10 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
                 () -> {
                     // suspend
                     reconciler.reconcile(sessionJob, readyContext);
-                    sessionJob.getStatus().getJobStatus().setState("CANCELED");
+                    sessionJob
+                            .getStatus()
+                            .getJobStatus()
+                            
.setState(org.apache.flink.api.common.JobStatus.CANCELED);
                     // upgrade
                     reconciler.reconcile(sessionJob, readyContext);
                 });
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
index 514acbd4..336de17e 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
@@ -133,6 +133,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.api.common.JobStatus.CANCELLING;
+import static org.apache.flink.api.common.JobStatus.FAILING;
+import static org.apache.flink.api.common.JobStatus.FINISHED;
+import static org.apache.flink.api.common.JobStatus.RUNNING;
 import static 
org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION;
 import static 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -293,7 +297,7 @@ public class AbstractFlinkServiceTest {
         ReconciliationUtils.updateStatusForDeployedSpec(deployment, new 
Configuration());
 
         
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
-        deployment.getStatus().getJobStatus().setState("RUNNING");
+        deployment.getStatus().getJobStatus().setState(RUNNING);
         flinkService.cancelJob(
                 deployment,
                 SuspendMode.STATELESS,
@@ -303,7 +307,7 @@ public class AbstractFlinkServiceTest {
         assertEquals(jobID, cancelFuture.get());
         assertNull(jobStatus.getSavepointInfo().getLastSavepoint());
         assertNull(jobStatus.getUpgradeSavepointPath());
-        assertEquals("FINISHED", jobStatus.getState());
+        assertEquals(FINISHED, jobStatus.getState());
         assertEquals(
                 List.of(
                         Tuple2.of(
@@ -336,7 +340,7 @@ public class AbstractFlinkServiceTest {
         var job = TestUtils.buildSessionJob();
         var jobStatus = job.getStatus().getJobStatus();
         jobStatus.setJobId(jobID.toHexString());
-        jobStatus.setState("RUNNING");
+        jobStatus.setState(RUNNING);
         ReconciliationUtils.updateStatusForDeployedSpec(job, new 
Configuration());
 
         if (statusCode == 500) {
@@ -345,10 +349,10 @@ public class AbstractFlinkServiceTest {
                     () ->
                             flinkService.cancelSessionJob(
                                     job, SuspendMode.STATELESS, new 
Configuration()));
-            assertEquals("RUNNING", jobStatus.getState());
+            assertEquals(RUNNING, jobStatus.getState());
         } else {
             flinkService.cancelSessionJob(job, SuspendMode.STATELESS, new 
Configuration());
-            assertEquals("CANCELLING", jobStatus.getState());
+            assertEquals(CANCELLING, jobStatus.getState());
         }
     }
 
@@ -380,7 +384,7 @@ public class AbstractFlinkServiceTest {
         
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
         JobStatus jobStatus = deployment.getStatus().getJobStatus();
         jobStatus.setJobId(jobID.toHexString());
-        
jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
+        jobStatus.setState(RUNNING);
         ReconciliationUtils.updateStatusForDeployedSpec(deployment, new 
Configuration());
 
         var result =
@@ -396,7 +400,7 @@ public class AbstractFlinkServiceTest {
         assertFalse(stopWithSavepointFuture.get().f1);
         assertEquals(savepointPath, stopWithSavepointFuture.get().f2);
 
-        assertEquals(jobStatus.getState(), 
org.apache.flink.api.common.JobStatus.FINISHED.name());
+        assertEquals(jobStatus.getState(), 
org.apache.flink.api.common.JobStatus.FINISHED);
         assertEquals(
                 deployment.getStatus().getJobManagerDeploymentStatus(),
                 deleteAfterSavepoint
@@ -443,7 +447,7 @@ public class AbstractFlinkServiceTest {
         
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
         JobStatus jobStatus = deployment.getStatus().getJobStatus();
         jobStatus.setJobId(jobID.toHexString());
-        
jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
+        jobStatus.setState(RUNNING);
         ReconciliationUtils.updateStatusForDeployedSpec(deployment, new 
Configuration());
 
         if (drainOnSavepoint) {
@@ -467,7 +471,7 @@ public class AbstractFlinkServiceTest {
 
         assertTrue(stopWithSavepointFuture.isDone());
         assertEquals(jobID, stopWithSavepointFuture.get().f0);
-        assertEquals(jobStatus.getState(), 
org.apache.flink.api.common.JobStatus.FINISHED.name());
+        assertEquals(jobStatus.getState(), 
org.apache.flink.api.common.JobStatus.FINISHED);
 
         if (drainOnSavepoint) {
             assertTrue(stopWithSavepointFuture.get().f1);
@@ -509,7 +513,7 @@ public class AbstractFlinkServiceTest {
 
         JobStatus jobStatus = job.getStatus().getJobStatus();
         jobStatus.setJobId(jobID.toHexString());
-        
jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
+        jobStatus.setState(RUNNING);
         ReconciliationUtils.updateStatusForDeployedSpec(job, new 
Configuration());
 
         if (drainOnSavepoint) {
@@ -527,7 +531,7 @@ public class AbstractFlinkServiceTest {
         var result = flinkService.cancelSessionJob(job, SuspendMode.SAVEPOINT, 
deployConf);
         assertTrue(stopWithSavepointFuture.isDone());
         assertEquals(jobID, stopWithSavepointFuture.get().f0);
-        assertEquals(jobStatus.getState(), 
org.apache.flink.api.common.JobStatus.FINISHED.name());
+        assertEquals(jobStatus.getState(), 
org.apache.flink.api.common.JobStatus.FINISHED);
 
         assertEquals(savepointPath, result.getSavepointPath().get());
 
@@ -549,15 +553,13 @@ public class AbstractFlinkServiceTest {
         JobID jobID = JobID.generate();
         JobStatus jobStatus = deployment.getStatus().getJobStatus();
         jobStatus.setJobId(jobID.toHexString());
-        jobStatus.setState("FAILING");
+        jobStatus.setState(FAILING);
 
         flinkService.cancelJob(
                 deployment, SuspendMode.CANCEL, 
configManager.getObserveConfig(deployment), false);
         assertTrue(flinkService.haDeleted.isEmpty());
         assertTrue(flinkService.deleted.isEmpty());
-        assertEquals("CANCELLING", jobStatus.getState());
-        assertNull(jobStatus.getSavepointInfo().getLastSavepoint());
-        assertNull(jobStatus.getUpgradeSavepointPath());
+        assertEquals(CANCELLING, jobStatus.getState());
     }
 
     @Test
@@ -752,7 +754,7 @@ public class AbstractFlinkServiceTest {
         
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
         JobStatus jobStatus = deployment.getStatus().getJobStatus();
         jobStatus.setJobId(jobID.toHexString());
-        
jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
+        jobStatus.setState(RUNNING);
         ReconciliationUtils.updateStatusForDeployedSpec(deployment, new 
Configuration());
 
         jobStatus.setJobId(jobID.toString());
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
index 23f179b2..352c726c 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator.service;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.JobManagerOptions;
@@ -288,7 +289,7 @@ public class NativeFlinkServiceTest {
                 Map.of(v1.toHexString(), "4", v2.toHexString(), "1"));
         spec.setFlinkConfiguration(appConfig.toMap());
 
-        flinkDep.getStatus().getJobStatus().setState("RUNNING");
+        flinkDep.getStatus().getJobStatus().setState(JobStatus.RUNNING);
 
         current.set(
                 Map.of(
@@ -357,16 +358,22 @@ public class NativeFlinkServiceTest {
 
         // Make sure we only try to rescale non-terminal
         testScaleConditionDep(
-                flinkDep, service, d -> 
d.getStatus().getJobStatus().setState("FAILED"), false);
+                flinkDep,
+                service,
+                d -> d.getStatus().getJobStatus().setState(JobStatus.FAILED),
+                false);
 
         testScaleConditionDep(
                 flinkDep,
                 service,
-                d -> d.getStatus().getJobStatus().setState("RECONCILING"),
+                d -> 
d.getStatus().getJobStatus().setState(JobStatus.RECONCILING),
                 false);
 
         testScaleConditionDep(
-                flinkDep, service, d -> 
d.getStatus().getJobStatus().setState("RUNNING"), true);
+                flinkDep,
+                service,
+                d -> d.getStatus().getJobStatus().setState(JobStatus.RUNNING),
+                true);
 
         testScaleConditionDep(flinkDep, service, d -> 
d.getSpec().setJob(null), false);
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java
index 1a3dc8e6..7b05226b 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java
@@ -42,6 +42,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static org.apache.flink.api.common.JobStatus.FAILED;
 import static org.apache.flink.kubernetes.operator.TestUtils.reconcileSpec;
 import static 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.COMPLETED;
 import static 
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.IN_PROGRESS;
@@ -243,7 +244,7 @@ public class FlinkStateSnapshotUtilsTest {
         assertFalse(result);
         assertThat(eventCollector.events).isEmpty();
 
-        deployment.getStatus().getJobStatus().setState("FAILED");
+        deployment.getStatus().getJobStatus().setState(FAILED);
         result =
                 FlinkStateSnapshotUtils.abandonSnapshotIfJobNotRunning(
                         client, snapshot, deployment, eventRecorder);
@@ -262,7 +263,7 @@ public class FlinkStateSnapshotUtilsTest {
     @Test
     public void testAbandonSnapshotIfJobNotRunningJobFailed() {
         var deployment = initDeployment();
-        deployment.getStatus().getJobStatus().setState("FAILED");
+        deployment.getStatus().getJobStatus().setState(FAILED);
         var snapshot = initSavepoint(IN_PROGRESS, null);
         var eventCollector = new FlinkStateSnapshotEventCollector();
         var eventRecorder = new EventRecorder((x, y) -> {}, eventCollector);
@@ -341,7 +342,7 @@ public class FlinkStateSnapshotUtilsTest {
 
     private static FlinkDeployment initDeployment() {
         FlinkDeployment deployment = 
TestUtils.buildApplicationCluster(FlinkVersion.v1_19);
-        
deployment.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
+        deployment.getStatus().getJobStatus().setState(JobStatus.RUNNING);
         
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
         reconcileSpec(deployment);
         return deployment;
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
index 37f05309..5b3cb541 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
@@ -359,7 +359,7 @@ public class SnapshotUtilsTest {
                 .getMetadata()
                 
.setCreationTimestamp(Instant.now().minus(Duration.ofMinutes(15)).toString());
 
-        
deployment.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
+        deployment.getStatus().getJobStatus().setState(JobStatus.RUNNING);
         
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
         reconcileSpec(deployment);
         return deployment;
diff --git 
a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml 
b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index b4c2d4c0..6042799d 100644
--- 
a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ 
b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -10355,6 +10355,18 @@ spec:
                   startTime:
                     type: string
                   state:
+                    enum:
+                    - CANCELED
+                    - CANCELLING
+                    - CREATED
+                    - FAILED
+                    - FAILING
+                    - FINISHED
+                    - INITIALIZING
+                    - RECONCILING
+                    - RESTARTING
+                    - RUNNING
+                    - SUSPENDED
                     type: string
                   updateTime:
                     type: string
diff --git 
a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml 
b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
index b7ea567d..431787c7 100644
--- 
a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
+++ 
b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
@@ -197,6 +197,18 @@ spec:
                   startTime:
                     type: string
                   state:
+                    enum:
+                    - CANCELED
+                    - CANCELLING
+                    - CREATED
+                    - FAILED
+                    - FAILING
+                    - FINISHED
+                    - INITIALIZING
+                    - RECONCILING
+                    - RESTARTING
+                    - RUNNING
+                    - SUSPENDED
                     type: string
                   updateTime:
                     type: string

Reply via email to