This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new b3b45ba0 [FLINK-36425][refactor] Replace jobStatus.state string with
Flink JobStatus enum
b3b45ba0 is described below
commit b3b45ba0bd31322f07d7b3aa0a9e10bba923ae22
Author: Gyula Fora <[email protected]>
AuthorDate: Sun Aug 18 11:59:58 2024 +0200
[FLINK-36425][refactor] Replace jobStatus.state string with Flink JobStatus
enum
---
docs/content/docs/custom-resource/reference.md | 2 +-
flink-kubernetes-operator-api/pom.xml | 4 +-
.../operator/api/status/CommonStatus.java | 5 +-
.../kubernetes/operator/api/status/JobStatus.java | 2 +-
.../operator/api/status/SavepointInfo.java | 2 +
.../api/validation/CrdCompatibilityChecker.java | 5 +-
.../controller/FlinkDeploymentController.java | 2 +-
.../operator/controller/FlinkResourceContext.java | 16 +--
.../operator/observer/JobStatusObserver.java | 12 +-
.../AbstractFlinkDeploymentObserver.java | 6 +-
.../operator/reconciler/ReconciliationUtils.java | 33 ++----
.../AbstractFlinkResourceReconciler.java | 3 +-
.../deployment/AbstractJobReconciler.java | 3 +-
.../deployment/ApplicationReconciler.java | 4 +-
.../sessionjob/SessionJobReconciler.java | 2 +-
.../operator/service/AbstractFlinkService.java | 11 +-
.../operator/service/NativeFlinkService.java | 2 +-
.../controller/DeploymentRecoveryTest.java | 6 +-
.../controller/FailedDeploymentRestartTest.java | 9 +-
.../controller/FlinkDeploymentControllerTest.java | 33 +++---
.../controller/FlinkSessionJobControllerTest.java | 40 +++----
.../FlinkStateSnapshotControllerTest.java | 12 +-
.../operator/controller/RollbackTest.java | 9 +-
.../controller/UnhealthyDeploymentRestartTest.java | 9 +-
.../lifecycle/ResourceLifecycleMetricsTest.java | 8 +-
.../operator/observer/JobStatusObserverTest.java | 4 +-
.../observer/SnapshotObserverLegacyTest.java | 5 +-
.../deployment/ApplicationObserverTest.java | 17 +--
.../sessionjob/FlinkSessionJobObserverTest.java | 36 +++---
.../deployment/ApplicationReconcilerTest.java | 45 +++++---
.../ApplicationReconcilerUpgradeModeTest.java | 36 ++++--
.../deployment/SessionReconcilerTest.java | 2 +-
.../sessionjob/SessionJobReconcilerTest.java | 121 ++++++++++++---------
.../operator/service/AbstractFlinkServiceTest.java | 34 +++---
.../operator/service/NativeFlinkServiceTest.java | 15 ++-
.../utils/FlinkStateSnapshotUtilsTest.java | 7 +-
.../operator/utils/SnapshotUtilsTest.java | 2 +-
.../crds/flinkdeployments.flink.apache.org-v1.yml | 12 ++
.../crds/flinksessionjobs.flink.apache.org-v1.yml | 12 ++
39 files changed, 323 insertions(+), 265 deletions(-)
diff --git a/docs/content/docs/custom-resource/reference.md
b/docs/content/docs/custom-resource/reference.md
index c336e1c4..600f6289 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -402,7 +402,7 @@ This serves as a full reference for FlinkDeployment and
FlinkSessionJob custom r
| ----------| ---- | ---- |
| jobName | java.lang.String | Name of the job. |
| jobId | java.lang.String | Flink JobId of the Job. |
-| state | java.lang.String | Last observed state of the job. |
+| state | org.apache.flink.api.common.JobStatus | Last observed state of the
job. |
| startTime | java.lang.String | Start time of the job. |
| updateTime | java.lang.String | Update time of the job. |
| upgradeSavepointPath | java.lang.String | |
diff --git a/flink-kubernetes-operator-api/pom.xml
b/flink-kubernetes-operator-api/pom.xml
index da1b59a6..40ec2642 100644
--- a/flink-kubernetes-operator-api/pom.xml
+++ b/flink-kubernetes-operator-api/pom.xml
@@ -226,7 +226,7 @@ under the License.
fork="true" failonerror="true">
<classpath
refid="maven.compile.classpath"/>
<arg
value="file://${rootDir}/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml"/>
- <arg
value="https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.6.0/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml"/>
+ <arg
value="https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.9.0/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml"/>
</java>
</target>
</configuration>
@@ -243,7 +243,7 @@ under the License.
fork="true" failonerror="true">
<classpath
refid="maven.compile.classpath"/>
<arg
value="file://${rootDir}/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml"/>
- <arg
value="https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.6.0/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml"/>
+ <arg
value="https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.9.0/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml"/>
</java>
</target>
</configuration>
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
index 0e9635cb..9d8c6475 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java
@@ -81,10 +81,7 @@ public abstract class CommonStatus<SPEC extends
AbstractFlinkSpec> {
return ResourceLifecycleState.SUSPENDED;
}
- var jobState = getJobStatus().getState();
- if (jobState != null
- && org.apache.flink.api.common.JobStatus.valueOf(jobState)
- .equals(org.apache.flink.api.common.JobStatus.FAILED))
{
+ if (getJobStatus().getState() ==
org.apache.flink.api.common.JobStatus.FAILED) {
return ResourceLifecycleState.FAILED;
}
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobStatus.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobStatus.java
index 6adef53c..40d67a34 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobStatus.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/JobStatus.java
@@ -42,7 +42,7 @@ public class JobStatus {
/** Last observed state of the job. */
@PrinterColumn(name = "Job Status")
- private String state;
+ private org.apache.flink.api.common.JobStatus state;
/** Start time of the job. */
private String startTime;
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/SavepointInfo.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/SavepointInfo.java
index bdbe8de0..6ae9c27d 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/SavepointInfo.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/SavepointInfo.java
@@ -88,6 +88,8 @@ public class SavepointInfo implements SnapshotInfo {
*/
public void updateLastSavepoint(Savepoint savepoint) {
if (savepoint == null) {
+ // In terminal states we have to handle the case when there is
actually no savepoint to
+ // not restore from an old one
lastSavepoint = null;
} else if (lastSavepoint == null
||
!lastSavepoint.getLocation().equals(savepoint.getLocation())) {
diff --git
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/validation/CrdCompatibilityChecker.java
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/validation/CrdCompatibilityChecker.java
index faf6024a..8dce0d89 100644
---
a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/validation/CrdCompatibilityChecker.java
+++
b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/validation/CrdCompatibilityChecker.java
@@ -160,7 +160,10 @@ public class CrdCompatibilityChecker {
protected static void checkStringTypeCompatibility(
String path, JsonNode oldNode, JsonNode newNode) {
if (!oldNode.has("enum") && newNode.has("enum")) {
- err("Cannot turn string into enum for " + path);
+ // We make an exception here for jobstatus.state, this is a
backward compatible change
+ if (!path.equals(".status.jobStatus.state")) {
+ err("Cannot turn string into enum for " + path);
+ }
}
if (oldNode.has("enum")) {
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index dbaa6d48..328c2fb7 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -176,7 +176,7 @@ public class FlinkDeploymentController
var flinkApp = ctx.getResource();
LOG.error("Flink Deployment failed", dfe);
flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
-
flinkApp.getStatus().getJobStatus().setState(JobStatus.RECONCILING.name());
+ flinkApp.getStatus().getJobStatus().setState(JobStatus.RECONCILING);
ReconciliationUtils.updateForReconciliationError(ctx, dfe);
eventRecorder.triggerEvent(
flinkApp,
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
index 0d5c5510..3010f478 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobStatus;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
-import
org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
@@ -73,7 +72,7 @@ public abstract class FlinkResourceContext<CR extends
AbstractFlinkResource<?, ?
CommonStatus<?> status = getResource().getStatus();
String jobId = status.getJobStatus().getJobId();
- JobStatus jobStatus = generateJobStatusEnum(status);
+ JobStatus jobStatus = status.getJobStatus().getState();
return new KubernetesJobAutoScalerContext(
jobId == null ? null : JobID.fromHexString(jobId),
@@ -84,19 +83,6 @@ public abstract class FlinkResourceContext<CR extends
AbstractFlinkResource<?, ?
this);
}
- @Nullable
- private JobStatus generateJobStatusEnum(CommonStatus<?> status) {
- if (status.getLifecycleState() != ResourceLifecycleState.STABLE) {
- return null;
- }
-
- String state = status.getJobStatus().getState();
- if (state == null) {
- return null;
- }
- return JobStatus.valueOf(state);
- }
-
/**
* Get the config that is currently deployed for the resource spec. The
returned config may be
* null in case the resource is not accessible/ready yet.
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
index e299531f..c94c1231 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java
@@ -122,7 +122,7 @@ public class JobStatusObserver<R extends
AbstractFlinkResource<?, ?>> {
// upgrading state and retry the upgrade (if possible)
resource.getStatus().getReconciliationStatus().setState(ReconciliationState.DEPLOYED);
}
- jobStatus.setState(JobStatus.RECONCILING.name());
+ jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING);
resource.getStatus().setError(JOB_NOT_FOUND_ERR);
}
@@ -135,9 +135,9 @@ public class JobStatusObserver<R extends
AbstractFlinkResource<?, ?>> {
*/
private void ifRunningMoveToReconciling(
org.apache.flink.kubernetes.operator.api.status.JobStatus
jobStatus,
- String previousJobStatus) {
- if (JobStatus.RUNNING.name().equals(previousJobStatus)) {
- jobStatus.setState(JobStatus.RECONCILING.name());
+ JobStatus previousJobStatus) {
+ if (JobStatus.RUNNING == previousJobStatus) {
+ jobStatus.setState(JobStatus.RECONCILING);
}
}
@@ -160,7 +160,7 @@ public class JobStatusObserver<R extends
AbstractFlinkResource<?, ?>> {
var previousJobStatus = jobStatus.getState();
var currentJobStatus = clusterJobStatus.getJobState();
- jobStatus.setState(clusterJobStatus.getJobState().name());
+ jobStatus.setState(currentJobStatus);
jobStatus.setJobName(clusterJobStatus.getJobName());
jobStatus.setStartTime(String.valueOf(clusterJobStatus.getStartTime()));
@@ -177,7 +177,7 @@ public class JobStatusObserver<R extends
AbstractFlinkResource<?, ?>> {
if (JobStatus.CANCELED == currentJobStatus
|| (currentJobStatus.isGloballyTerminalState()
- &&
JobStatus.CANCELLING.name().equals(previousJobStatus))) {
+ &&
JobStatus.CANCELLING.equals(previousJobStatus))) {
// The job was cancelled
markSuspended(resource);
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
index fd19713a..9020cbfa 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractFlinkDeploymentObserver.java
@@ -135,7 +135,7 @@ public abstract class AbstractFlinkDeploymentObserver
checkContainerBackoff(ctx);
} catch (DeploymentFailedException dfe) {
// throw only when not already in error status to allow for
spec update
-
deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING.name());
+
deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING);
if (!JobManagerDeploymentStatus.ERROR.equals(
deploymentStatus.getJobManagerDeploymentStatus())) {
throw dfe;
@@ -149,7 +149,7 @@ public abstract class AbstractFlinkDeploymentObserver
}
deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
- deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING.name());
+ deploymentStatus.getJobStatus().setState(JobStatus.RECONCILING);
if (previousJmStatus != JobManagerDeploymentStatus.MISSING
&& previousJmStatus != JobManagerDeploymentStatus.ERROR) {
@@ -192,7 +192,7 @@ public abstract class AbstractFlinkDeploymentObserver
FlinkDeploymentStatus status = dep.getStatus();
var reconciliationStatus = status.getReconciliationStatus();
if (status.getJobManagerDeploymentStatus() !=
JobManagerDeploymentStatus.ERROR
- &&
!JobStatus.FAILED.name().equals(dep.getStatus().getJobStatus().getState())
+ &&
!JobStatus.FAILED.equals(dep.getStatus().getJobStatus().getState())
&& reconciliationStatus.isLastReconciledSpecStable()) {
status.setError(null);
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 15e15141..8dba93bc 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -55,7 +55,10 @@ import java.time.Duration;
import java.time.Instant;
import java.util.function.BiConsumer;
+import static org.apache.flink.api.common.JobStatus.CANCELED;
+import static org.apache.flink.api.common.JobStatus.CANCELLING;
import static org.apache.flink.api.common.JobStatus.FINISHED;
+import static org.apache.flink.api.common.JobStatus.RECONCILING;
import static org.apache.flink.api.common.JobStatus.RUNNING;
import static
org.apache.flink.kubernetes.operator.utils.FlinkResourceExceptionUtils.updateFlinkResourceException;
import static
org.apache.flink.kubernetes.operator.utils.FlinkResourceExceptionUtils.updateFlinkStateSnapshotException;
@@ -233,7 +236,7 @@ public class ReconciliationUtils {
}
}
- public static void updateForReconciliationError(FlinkResourceContext ctx,
Throwable error) {
+ public static void updateForReconciliationError(FlinkResourceContext<?>
ctx, Throwable error) {
updateFlinkResourceException(error, ctx.getResource(),
ctx.getOperatorConfig());
}
@@ -352,35 +355,23 @@ public class ReconciliationUtils {
public static boolean isJobInTerminalState(CommonStatus<?> status) {
var jobState = status.getJobStatus().getState();
- if (jobState == null) {
- jobState =
org.apache.flink.api.common.JobStatus.RECONCILING.name();
- }
- return
org.apache.flink.api.common.JobStatus.valueOf(jobState).isGloballyTerminalState();
+ return jobState != null && jobState.isGloballyTerminalState();
}
public static boolean isJobRunning(CommonStatus<?> status) {
- return org.apache.flink.api.common.JobStatus.RUNNING
- .name()
- .equals(status.getJobStatus().getState());
+ return RUNNING == status.getJobStatus().getState();
}
public static boolean isJobCancelled(CommonStatus<?> status) {
- return org.apache.flink.api.common.JobStatus.CANCELED
- .name()
- .equals(status.getJobStatus().getState());
+ return CANCELED == status.getJobStatus().getState();
}
public static boolean isJobCancellable(CommonStatus<?> status) {
- return !org.apache.flink.api.common.JobStatus.RECONCILING
- .name()
- .equals(status.getJobStatus().getState());
+ return RECONCILING != status.getJobStatus().getState();
}
public static boolean isJobCancelling(CommonStatus<?> status) {
- return status.getJobStatus() != null
- && org.apache.flink.api.common.JobStatus.CANCELLING
- .name()
- .equals(status.getJobStatus().getState());
+ return status.getJobStatus() != null && CANCELLING ==
status.getJobStatus().getState();
}
/**
@@ -503,8 +494,7 @@ public class ReconciliationUtils {
* @param status Status to be updated.
*/
public static void checkAndUpdateStableSpec(CommonStatus<?> status) {
- var flinkJobStatus =
-
org.apache.flink.api.common.JobStatus.valueOf(status.getJobStatus().getState());
+ var flinkJobStatus = status.getJobStatus().getState();
if (status.getReconciliationStatus().getState() !=
ReconciliationState.DEPLOYED) {
return;
@@ -542,8 +532,7 @@ public class ReconciliationUtils {
var lastJobSpec = lastSpecWithMeta.getSpec().getJob();
if (lastJobSpec != null) {
lastJobSpec.setState(JobState.RUNNING);
- status.getJobStatus()
-
.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+ status.getJobStatus().setState(RECONCILING);
}
reconciliationStatus.setState(ReconciliationState.DEPLOYED);
reconciliationStatus.setLastReconciledSpec(
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index cc57889b..757e1a7c 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -18,7 +18,6 @@
package org.apache.flink.kubernetes.operator.reconciler.deployment;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.JobStatus;
import org.apache.flink.autoscaler.JobAutoScaler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
@@ -511,7 +510,7 @@ public abstract class AbstractFlinkResourceReconciler<
boolean nonTerminalApplication =
!sessionCluster
&& deployedJob.getState() == JobState.RUNNING
- &&
!JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState();
+ && !jobStatus.getState().isGloballyTerminalState();
boolean jmShouldBeRunning = sessionCluster || nonTerminalApplication;
return jmShouldBeRunning
&& (status.getJobManagerDeploymentStatus() ==
JobManagerDeploymentStatus.MISSING);
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 594632b2..a6db17b9 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -420,8 +420,7 @@ public abstract class AbstractJobReconciler<
@Override
public boolean reconcileOtherChanges(FlinkResourceContext<CR> ctx) throws
Exception {
var status = ctx.getResource().getStatus();
- var jobStatus =
-
org.apache.flink.api.common.JobStatus.valueOf(status.getJobStatus().getState());
+ var jobStatus = status.getJobStatus().getState();
if (jobStatus == org.apache.flink.api.common.JobStatus.FAILED
&&
ctx.getObserveConfig().getBoolean(OPERATOR_JOB_RESTART_FAILED)) {
LOG.info("Stopping failed Flink job...");
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index a078a593..71cf4179 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -124,7 +124,7 @@ public class ApplicationReconciler
private void deleteJmThatNeverStarted(
FlinkService flinkService, FlinkDeployment deployment,
Configuration deployConfig) {
-
deployment.getStatus().getJobStatus().setState(JobStatus.FAILED.name());
+ deployment.getStatus().getJobStatus().setState(JobStatus.FAILED);
flinkService.deleteClusterDeployment(
deployment.getMetadata(), deployment.getStatus(),
deployConfig, false);
LOG.info("Deleted application cluster that never started.");
@@ -181,7 +181,7 @@ public class ApplicationReconciler
MSG_SUBMIT,
ctx.getKubernetesClient());
flinkService.submitApplicationCluster(spec.getJob(), deployConfig,
requireHaMetadata);
-
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING);
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
IngressUtils.updateIngressRules(
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
index af227a93..f1fa8885 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
@@ -94,7 +94,7 @@ public class SessionJobReconciler
savepoint.orElse(null));
var status = ctx.getResource().getStatus();
-
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING);
}
@Override
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index c89baf89..68d7fe89 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -343,7 +343,7 @@ public abstract class AbstractFlinkService implements
FlinkService {
deployment.getMetadata(), status, conf,
suspendMode.deleteHaMeta());
}
- status.getJobStatus().setState(JobStatus.FINISHED.name());
+ status.getJobStatus().setState(JobStatus.FINISHED);
return CancelResult.completed(savepointPath);
}
@@ -366,7 +366,7 @@ public abstract class AbstractFlinkService implements
FlinkService {
break;
}
}
- status.getJobStatus().setState(JobStatus.FINISHED.name());
+ status.getJobStatus().setState(JobStatus.FINISHED);
status.getJobStatus().setJobId(null);
return CancelResult.completed(savepointPath);
}
@@ -404,7 +404,7 @@ public abstract class AbstractFlinkService implements
FlinkService {
"Cancellation Error",
EventRecorder.Reason.CleanupFailed.name(), e);
}
}
- status.getJobStatus().setState(JobStatus.CANCELLING.name());
+ status.getJobStatus().setState(JobStatus.CANCELLING);
}
public String savepointJobOrError(
@@ -1037,9 +1037,8 @@ public abstract class AbstractFlinkService implements
FlinkService {
protected void updateStatusAfterClusterDeletion(FlinkDeploymentStatus
status) {
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
var currentJobState = status.getJobStatus().getState();
- if (currentJobState == null
- ||
!JobStatus.valueOf(currentJobState).isGloballyTerminalState()) {
- status.getJobStatus().setState(JobStatus.FINISHED.name());
+ if (currentJobState == null ||
!currentJobState.isGloballyTerminalState()) {
+ status.getJobStatus().setState(JobStatus.FINISHED);
}
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
index ffd071e6..a5a63f8b 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
@@ -262,7 +262,7 @@ public class NativeFlinkService extends
AbstractFlinkService {
var status = resource.getStatus();
if (ReconciliationUtils.isJobInTerminalState(status)
- ||
JobStatus.RECONCILING.name().equals(status.getJobStatus().getState())) {
+ ||
JobStatus.RECONCILING.equals(status.getJobStatus().getState())) {
LOG.info("Job in terminal or reconciling state cannot be scaled
in-place");
return false;
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
index 67857b52..76d03c9b 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/DeploymentRecoveryTest.java
@@ -17,7 +17,6 @@
package org.apache.flink.kubernetes.operator.controller;
-import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
@@ -35,6 +34,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
+import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -101,7 +101,7 @@ public class DeploymentRecoveryTest {
assertEquals(
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
- assertEquals(JobStatus.RUNNING.name(),
appCluster.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING,
appCluster.getStatus().getJobStatus().getState());
// Remove deployment
flinkService.setPortReady(false);
@@ -130,7 +130,7 @@ public class DeploymentRecoveryTest {
assertEquals(
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
- assertEquals("RUNNING",
appCluster.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING,
appCluster.getStatus().getJobStatus().getState());
assertEquals(
appCluster.getSpec(),
appCluster
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java
index 8fcad0a5..c84fc74c 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java
@@ -42,6 +42,7 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.util.Optional;
+import static org.apache.flink.api.common.JobStatus.RUNNING;
import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_JOB_RESTART_FAILED;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -85,7 +86,7 @@ public class FailedDeploymentRestartTest extends
OperatorTestBase {
assertEquals(
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
- assertEquals("RUNNING",
appCluster.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING,
appCluster.getStatus().getJobStatus().getState());
// Make deployment unhealthy
flinkService.markApplicationJobFailedWithError(
@@ -101,7 +102,7 @@ public class FailedDeploymentRestartTest extends
OperatorTestBase {
assertEquals(
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
- assertEquals("RUNNING",
appCluster.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING,
appCluster.getStatus().getJobStatus().getState());
// We started without savepoint
appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
@@ -129,7 +130,7 @@ public class FailedDeploymentRestartTest extends
OperatorTestBase {
assertEquals(
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
- assertEquals("RUNNING",
appCluster.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING,
appCluster.getStatus().getJobStatus().getState());
assertNull(flinkService.getSubmittedConf().get(SavepointConfigOptions.SAVEPOINT_PATH));
// trigger checkpoint
@@ -157,7 +158,7 @@ public class FailedDeploymentRestartTest extends
OperatorTestBase {
assertEquals(
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
- assertEquals("RUNNING",
appCluster.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING,
appCluster.getStatus().getJobStatus().getState());
// check savepoint_path
if (upgradeMode != UpgradeMode.STATELESS) {
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 32e49502..08937e70 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -128,7 +128,7 @@ public class FlinkDeploymentControllerTest {
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
assertEquals(
- org.apache.flink.api.common.JobStatus.RUNNING.name(),
+ org.apache.flink.api.common.JobStatus.RUNNING,
appCluster.getStatus().getJobStatus().getState());
assertEquals(7, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isUpdateStatus());
@@ -147,7 +147,7 @@ public class FlinkDeploymentControllerTest {
JobStatusMessage expectedJobStatus = flinkService.listJobs().get(0).f1;
assertEquals(expectedJobStatus.getJobId().toHexString(),
jobStatus.getJobId());
assertEquals(expectedJobStatus.getJobName(), jobStatus.getJobName());
- assertEquals(expectedJobStatus.getJobState().toString(),
jobStatus.getState());
+ assertEquals(expectedJobStatus.getJobState(), jobStatus.getState());
// Validate last stable spec is still the old one
assertEquals(
@@ -350,7 +350,7 @@ public class FlinkDeploymentControllerTest {
JobManagerDeploymentStatus.ERROR,
appCluster.getStatus().getJobManagerDeploymentStatus());
assertEquals(
- org.apache.flink.api.common.JobStatus.RECONCILING.name(),
+ org.apache.flink.api.common.JobStatus.RECONCILING,
appCluster.getStatus().getJobStatus().getState());
// Validate status status
@@ -604,7 +604,7 @@ public class FlinkDeploymentControllerTest {
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
assertEquals(
- org.apache.flink.api.common.JobStatus.RUNNING.name(),
+ org.apache.flink.api.common.JobStatus.RUNNING,
appCluster.getStatus().getJobStatus().getState());
assertEquals(
JobState.RUNNING,
@@ -693,8 +693,7 @@ public class FlinkDeploymentControllerTest {
JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
// jobStatus has not been set at this time
- assertEquals(
- org.apache.flink.api.common.JobStatus.RECONCILING.name(),
jobStatus.getState());
+ assertEquals(org.apache.flink.api.common.JobStatus.RECONCILING,
jobStatus.getState());
// Switches operator mode to SESSION
appCluster.getSpec().setJob(null);
@@ -716,7 +715,7 @@ public class FlinkDeploymentControllerTest {
JobStatusMessage expectedJobStatus = flinkService.listJobs().get(0).f1;
assertEquals(expectedJobStatus.getJobId().toHexString(),
jobStatus.getJobId());
assertEquals(expectedJobStatus.getJobName(), jobStatus.getJobName());
- assertEquals(expectedJobStatus.getJobState().toString(),
jobStatus.getState());
+ assertEquals(expectedJobStatus.getJobState(), jobStatus.getState());
}
@Test
@@ -798,11 +797,11 @@ public class FlinkDeploymentControllerTest {
testController.reconcile(appCluster, context);
if (appCluster.getSpec().getJob() != null) {
assertEquals(
- org.apache.flink.api.common.JobStatus.RUNNING.name(),
+ org.apache.flink.api.common.JobStatus.RUNNING,
appCluster.getStatus().getJobStatus().getState());
} else {
assertEquals(
- org.apache.flink.api.common.JobStatus.FINISHED.name(),
+ org.apache.flink.api.common.JobStatus.FINISHED,
appCluster.getStatus().getJobStatus().getState());
}
assertEquals(
@@ -866,7 +865,7 @@ public class FlinkDeploymentControllerTest {
testController.reconcile(appCluster, context);
assertEquals(
- org.apache.flink.api.common.JobStatus.RUNNING.name(),
+ org.apache.flink.api.common.JobStatus.RUNNING,
appCluster.getStatus().getJobStatus().getState());
assertEquals(
JobManagerDeploymentStatus.READY,
@@ -894,7 +893,7 @@ public class FlinkDeploymentControllerTest {
testController.reconcile(appCluster, context);
assertEquals(
- org.apache.flink.api.common.JobStatus.RUNNING.name(),
+ org.apache.flink.api.common.JobStatus.RUNNING,
appCluster.getStatus().getJobStatus().getState());
assertEquals(
JobManagerDeploymentStatus.READY,
@@ -1108,7 +1107,7 @@ public class FlinkDeploymentControllerTest {
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
assertEquals(
- org.apache.flink.api.common.JobStatus.RUNNING.name(),
+ org.apache.flink.api.common.JobStatus.RUNNING,
appCluster.getStatus().getJobStatus().getState());
}
@@ -1154,7 +1153,7 @@ public class FlinkDeploymentControllerTest {
JobManagerDeploymentStatus.DEPLOYING,
appCluster.getStatus().getJobManagerDeploymentStatus());
assertEquals(
- org.apache.flink.api.common.JobStatus.RECONCILING.name(),
+ org.apache.flink.api.common.JobStatus.RECONCILING,
appCluster.getStatus().getJobStatus().getState());
assertEquals(4, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isUpdateStatus());
@@ -1177,7 +1176,7 @@ public class FlinkDeploymentControllerTest {
JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
assertEquals(
- org.apache.flink.api.common.JobStatus.RECONCILING.name(),
+ org.apache.flink.api.common.JobStatus.RECONCILING,
appCluster.getStatus().getJobStatus().getState());
assertEquals(5, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isUpdateStatus());
@@ -1191,7 +1190,7 @@ public class FlinkDeploymentControllerTest {
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
assertEquals(
- org.apache.flink.api.common.JobStatus.RUNNING.name(),
+ org.apache.flink.api.common.JobStatus.RUNNING,
appCluster.getStatus().getJobStatus().getState());
assertEquals(6, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isUpdateStatus());
@@ -1206,7 +1205,7 @@ public class FlinkDeploymentControllerTest {
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
assertEquals(
- org.apache.flink.api.common.JobStatus.RUNNING.name(),
+ org.apache.flink.api.common.JobStatus.RUNNING,
appCluster.getStatus().getJobStatus().getState());
assertEquals(6, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isUpdateStatus());
@@ -1220,7 +1219,7 @@ public class FlinkDeploymentControllerTest {
JobStatusMessage expectedJobStatus = flinkService.listJobs().get(0).f1;
assertEquals(expectedJobStatus.getJobId().toHexString(),
jobStatus.getJobId());
assertEquals(expectedJobStatus.getJobName(), jobStatus.getJobName());
- assertEquals(expectedJobStatus.getJobState().toString(),
jobStatus.getState());
+ assertEquals(expectedJobStatus.getJobState(), jobStatus.getState());
assertEquals(
appCluster.getStatus().getReconciliationStatus().getLastReconciledSpec(),
appCluster.getStatus().getReconciliationStatus().getLastStableSpec());
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
index 5b3578f3..82ccaea5 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.java
@@ -17,7 +17,6 @@
package org.apache.flink.kubernetes.operator.controller;
-import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
@@ -51,6 +50,9 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
+import static org.apache.flink.api.common.JobStatus.CANCELLING;
+import static org.apache.flink.api.common.JobStatus.RECONCILING;
+import static org.apache.flink.api.common.JobStatus.RUNNING;
import static
org.apache.flink.kubernetes.operator.TestUtils.MAX_RECONCILE_TIMES;
import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED;
import static
org.apache.flink.kubernetes.operator.utils.EventRecorder.Reason.ValidationError;
@@ -119,7 +121,7 @@ class FlinkSessionJobControllerTest {
sessionJob.getSpec().getJob().setParallelism(-1);
updateControl = testController.reconcile(sessionJob, context);
- assertEquals(JobStatus.RUNNING.name(),
sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING,
sessionJob.getStatus().getJobStatus().getState());
assertEquals(6, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isUpdateStatus());
@@ -137,7 +139,7 @@ class FlinkSessionJobControllerTest {
JobStatusMessage expectedJobStatus = flinkService.listJobs().get(0).f1;
assertEquals(expectedJobStatus.getJobId().toHexString(),
jobStatus.getJobId());
assertEquals(expectedJobStatus.getJobName(), jobStatus.getJobName());
- assertEquals(expectedJobStatus.getJobState().toString(),
jobStatus.getState());
+ assertEquals(expectedJobStatus.getJobState(), jobStatus.getState());
// Validate last stable spec is still the old one
assertEquals(
@@ -271,7 +273,7 @@ class FlinkSessionJobControllerTest {
testController.reconcile(sessionJob, context);
// Make sure we are cancelling
- assertEquals("CANCELLING",
sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(CANCELLING,
sessionJob.getStatus().getJobStatus().getState());
// Once cancelling completed make sure that last reconciled spec is
correctly upgraded and
// job was started from cp
@@ -285,7 +287,7 @@ class FlinkSessionJobControllerTest {
.deserializeLastReconciledSpec()
.getJob()
.getUpgradeMode());
- assertEquals("RECONCILING",
sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(RECONCILING,
sessionJob.getStatus().getJobStatus().getState());
assertEquals(
ReconciliationState.DEPLOYED,
sessionJob.getStatus().getReconciliationStatus().getState());
@@ -296,7 +298,7 @@ class FlinkSessionJobControllerTest {
assertEquals("cp1", jobs.get(0).f0);
testController.reconcile(sessionJob, context);
- assertEquals("RUNNING",
sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING,
sessionJob.getStatus().getJobStatus().getState());
// Suspend job
flinkService.setCheckpointInfo(
@@ -330,7 +332,7 @@ class FlinkSessionJobControllerTest {
testController.reconcile(sessionJob, context);
// Make sure we are cancelling
- assertEquals("CANCELLING",
sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(CANCELLING,
sessionJob.getStatus().getJobStatus().getState());
testController.events().poll();
assertEquals(
testController.events().poll().getReason(),
@@ -342,7 +344,7 @@ class FlinkSessionJobControllerTest {
testController.reconcile(sessionJob, context);
assertEquals(JobStatusObserver.JOB_NOT_FOUND_ERR,
sessionJob.getStatus().getError());
- assertEquals("RECONCILING",
sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(RECONCILING,
sessionJob.getStatus().getJobStatus().getState());
assertEquals(
ReconciliationState.DEPLOYED,
sessionJob.getStatus().getReconciliationStatus().getState());
@@ -496,7 +498,7 @@ class FlinkSessionJobControllerTest {
EventRecorder.Reason.JobStatusChanged,
EventRecorder.Reason.valueOf(statusEvents.get(2).getReason()));
- assertEquals(JobStatus.RUNNING.name(),
sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING,
sessionJob.getStatus().getJobStatus().getState());
assertEquals(
JobState.RUNNING,
sessionJob
@@ -518,8 +520,7 @@ class FlinkSessionJobControllerTest {
.put(KubernetesOperatorConfigOptions.JAR_ARTIFACT_HTTP_HEADER.key(), "changed");
updateControl = testController.reconcile(sessionJob, context);
assertFalse(updateControl.isUpdateStatus());
- assertEquals(
- JobStatus.RECONCILING.name(),
sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(RECONCILING,
sessionJob.getStatus().getJobStatus().getState());
// Check when the bad config is applied, observe() will change the
cluster state correctly
sessionJob.getSpec().getJob().setParallelism(-1);
@@ -531,7 +532,7 @@ class FlinkSessionJobControllerTest {
.getError()
.contains("Job parallelism must be larger than 0"));
assertFalse(updateControl.isUpdateStatus());
- assertEquals(JobStatus.RUNNING.name(),
sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING,
sessionJob.getStatus().getJobStatus().getState());
// Make sure we do validation before getting effective config in
reconcile().
// Verify the saved headers in lastReconciledSpec is actually used in
observe() by
@@ -547,7 +548,7 @@ class FlinkSessionJobControllerTest {
configuration.get(
KubernetesOperatorConfigOptions.JAR_ARTIFACT_HTTP_HEADER)));
testController.reconcile(sessionJob, context);
- assertEquals(JobStatus.RUNNING.name(),
sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING,
sessionJob.getStatus().getJobStatus().getState());
}
@Test
@@ -572,7 +573,7 @@ class FlinkSessionJobControllerTest {
assertNull(sessionJob.getStatus().getReconciliationStatus().getLastStableSpec());
testController.reconcile(sessionJob, context);
- assertEquals(JobStatus.RUNNING.name(),
sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING,
sessionJob.getStatus().getJobStatus().getState());
assertNull(sessionJob.getStatus().getError());
assertEquals(
@@ -649,7 +650,7 @@ class FlinkSessionJobControllerTest {
var deleteControl = testController.cleanup(sessionJob, context);
- assertEquals("CANCELLING",
sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(CANCELLING,
sessionJob.getStatus().getJobStatus().getState());
assertFalse(deleteControl.isRemoveFinalizer());
assertEquals(
configManager.getOperatorConfiguration().getProgressCheckInterval().toMillis(),
@@ -688,8 +689,7 @@ class FlinkSessionJobControllerTest {
testController.reconcile(sessionJob, context);
// Reconciling
- assertEquals(
- JobStatus.RECONCILING.name(),
sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(RECONCILING,
sessionJob.getStatus().getJobStatus().getState());
assertEquals(4, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isUpdateStatus());
assertEquals(
@@ -706,7 +706,7 @@ class FlinkSessionJobControllerTest {
// Running
updateControl = testController.reconcile(sessionJob, context);
- assertEquals(JobStatus.RUNNING.name(),
sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING,
sessionJob.getStatus().getJobStatus().getState());
assertEquals(5, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isUpdateStatus());
assertEquals(
@@ -716,7 +716,7 @@ class FlinkSessionJobControllerTest {
// Stable loop
updateControl = testController.reconcile(sessionJob, context);
- assertEquals(JobStatus.RUNNING.name(),
sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING,
sessionJob.getStatus().getJobStatus().getState());
assertEquals(5, testController.getInternalStatusUpdateCount());
assertFalse(updateControl.isUpdateStatus());
assertEquals(
@@ -729,7 +729,7 @@ class FlinkSessionJobControllerTest {
JobStatusMessage expectedJobStatus = flinkService.listJobs().get(0).f1;
assertEquals(expectedJobStatus.getJobId().toHexString(),
jobStatus.getJobId());
assertEquals(expectedJobStatus.getJobName(), jobStatus.getJobName());
- assertEquals(expectedJobStatus.getJobState().toString(),
jobStatus.getState());
+ assertEquals(expectedJobStatus.getJobState(), jobStatus.getState());
assertEquals(
sessionJob.getStatus().getReconciliationStatus().getLastReconciledSpec(),
sessionJob.getStatus().getReconciliationStatus().getLastStableSpec());
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java
index 9b68bea6..593841b5 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkStateSnapshotControllerTest.java
@@ -61,6 +61,8 @@ import java.util.Map;
import java.util.Optional;
import java.util.function.BiConsumer;
+import static org.apache.flink.api.common.JobStatus.CANCELED;
+import static org.apache.flink.api.common.JobStatus.RUNNING;
import static
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.ABANDONED;
import static
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.COMPLETED;
import static
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.FAILED;
@@ -188,7 +190,7 @@ public class FlinkStateSnapshotControllerTest {
controller.reconcile(snapshot, context);
assertThat(snapshot.getStatus().getState()).isEqualTo(IN_PROGRESS);
- deployment.getStatus().getJobStatus().setState("CANCELED");
+ deployment.getStatus().getJobStatus().setState(CANCELED);
controller.reconcile(snapshot, context);
var status = snapshot.getStatus();
var createdAt =
Instant.parse(snapshot.getMetadata().getCreationTimestamp());
@@ -281,14 +283,14 @@ public class FlinkStateSnapshotControllerTest {
assertThat(flinkService.getDisposedSavepoints()).isEmpty();
// Failed dispose, job not running
- deployment.getStatus().getJobStatus().setState("CANCELED");
+ deployment.getStatus().getJobStatus().setState(CANCELED);
snapshot.getSpec().getSavepoint().setDisposeOnDelete(true);
assertDeleteControl(
controller.cleanup(snapshot, context),
false,
configManager.getOperatorConfiguration().getReconcileInterval().toMillis());
assertThat(flinkService.getDisposedSavepoints()).isEmpty();
- deployment.getStatus().getJobStatus().setState("RUNNING");
+ deployment.getStatus().getJobStatus().setState(RUNNING);
// Failed dispose, REST error
snapshot.getSpec().getSavepoint().setDisposeOnDelete(true);
@@ -565,7 +567,7 @@ public class FlinkStateSnapshotControllerTest {
@Test
public void testReconcileJobNotRunning() {
var deployment = createDeployment();
- deployment.getStatus().getJobStatus().setState("CANCELED");
+ deployment.getStatus().getJobStatus().setState(CANCELED);
context = TestUtils.createSnapshotContext(client, deployment);
var snapshot = createSavepoint(deployment);
var errorMessage =
@@ -670,7 +672,7 @@ public class FlinkStateSnapshotControllerTest {
var deployment = TestUtils.buildApplicationCluster();
deployment
.getStatus()
-
.setJobStatus(JobStatus.builder().state("RUNNING").jobId(JOB_ID).build());
+
.setJobStatus(JobStatus.builder().state(RUNNING).jobId(JOB_ID).build());
deployment.getSpec().setFlinkVersion(flinkVersion);
deployment
.getSpec()
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
index 753a241e..83fb7624 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
@@ -45,6 +45,7 @@ import java.time.Duration;
import java.util.LinkedList;
import java.util.Map;
+import static org.apache.flink.api.common.JobStatus.RUNNING;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -110,7 +111,7 @@ public class RollbackTest {
testController.reconcile(dep, context);
},
() -> {
- assertEquals("RUNNING",
dep.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING,
dep.getStatus().getJobStatus().getState());
assertEquals(1, flinkService.listJobs().size());
dep.getSpec().setRestartNonce(10L);
testController.reconcile(dep, context);
@@ -154,7 +155,7 @@ public class RollbackTest {
testController.reconcile(dep, context);
},
() -> {
- assertEquals("RUNNING",
dep.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING,
dep.getStatus().getJobStatus().getState());
assertEquals(1, flinkService.listJobs().size());
dep.getSpec().setRestartNonce(10L);
testController.reconcile(dep, context);
@@ -243,7 +244,7 @@ public class RollbackTest {
testController.reconcile(dep, context);
},
() -> {
- assertEquals("RUNNING",
dep.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING,
dep.getStatus().getJobStatus().getState());
assertEquals(1, flinkService.listJobs().size());
// Trigger deployment recovery
@@ -316,7 +317,7 @@ public class RollbackTest {
testController.reconcile(dep, context);
},
() -> {
- assertEquals("RUNNING",
dep.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING,
dep.getStatus().getJobStatus().getState());
// Make sure we started from empty state even if savepoint
was available
assertNull(new
LinkedList<>(flinkService.listJobs()).getLast().f0);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
index c7b26d1b..e19089c5 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
@@ -36,6 +36,7 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.time.Duration;
+import static org.apache.flink.api.common.JobStatus.RUNNING;
import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED;
import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW;
import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED;
@@ -95,7 +96,7 @@ public class UnhealthyDeploymentRestartTest {
assertEquals(
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
- assertEquals("RUNNING",
appCluster.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING,
appCluster.getStatus().getJobStatus().getState());
// Make deployment unhealthy
flinkService.setMetricValue(NUM_RESTARTS_METRIC_NAME, "100");
@@ -111,7 +112,7 @@ public class UnhealthyDeploymentRestartTest {
assertEquals(
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
- assertEquals("RUNNING",
appCluster.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING,
appCluster.getStatus().getJobStatus().getState());
}
@ParameterizedTest
@@ -129,7 +130,7 @@ public class UnhealthyDeploymentRestartTest {
assertEquals(
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
- assertEquals("RUNNING",
appCluster.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING,
appCluster.getStatus().getJobStatus().getState());
// Make deployment unhealthy
flinkService.setMetricValue(NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME, "1");
@@ -153,6 +154,6 @@ public class UnhealthyDeploymentRestartTest {
assertEquals(
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
- assertEquals("RUNNING",
appCluster.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING,
appCluster.getStatus().getJobStatus().getState());
}
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
index 2620fb9b..a399d871 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/metrics/lifecycle/ResourceLifecycleMetricsTest.java
@@ -77,7 +77,7 @@ public class ResourceLifecycleMetricsTest {
application.getStatus().setError("errr");
assertEquals(STABLE, application.getStatus().getLifecycleState());
-
application.getStatus().getJobStatus().setState(JobStatus.FAILED.name());
+ application.getStatus().getJobStatus().setState(JobStatus.FAILED);
assertEquals(FAILED, application.getStatus().getLifecycleState());
application.getStatus().setError(null);
@@ -88,14 +88,14 @@ public class ResourceLifecycleMetricsTest {
.setState(ReconciliationState.ROLLING_BACK);
assertEquals(ROLLING_BACK,
application.getStatus().getLifecycleState());
-
application.getStatus().getJobStatus().setState(JobStatus.RECONCILING.name());
+ application.getStatus().getJobStatus().setState(JobStatus.RECONCILING);
application.getStatus().getReconciliationStatus().setState(ReconciliationState.ROLLED_BACK);
assertEquals(ROLLED_BACK, application.getStatus().getLifecycleState());
-
application.getStatus().getJobStatus().setState(JobStatus.FAILED.name());
+ application.getStatus().getJobStatus().setState(JobStatus.FAILED);
assertEquals(FAILED, application.getStatus().getLifecycleState());
-
application.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
+ application.getStatus().getJobStatus().setState(JobStatus.RUNNING);
application.getSpec().getJob().setState(JobState.SUSPENDED);
ReconciliationUtils.updateStatusForDeployedSpec(application, new
Configuration());
assertEquals(SUSPENDED, application.getStatus().getLifecycleState());
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
index f5b38887..512e9f7c 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java
@@ -63,7 +63,7 @@ public class JobStatusObserverTest extends OperatorTestBase {
job.getSpec().getJob().setUpgradeMode(upgradeMode);
var status = job.getStatus();
var jobStatus = status.getJobStatus();
- jobStatus.setState(fromStatus.name());
+ jobStatus.setState(fromStatus);
assertEquals(
JobState.RUNNING,
status.getReconciliationStatus()
@@ -91,7 +91,7 @@ public class JobStatusObserverTest extends OperatorTestBase {
var deployment = initDeployment();
var status = deployment.getStatus();
var jobStatus = status.getJobStatus();
- jobStatus.setState(fromStatus.name());
+ jobStatus.setState(fromStatus);
assertEquals(
JobState.RUNNING,
status.getReconciliationStatus()
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverLegacyTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverLegacyTest.java
index fdbfb877..0eee4712 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverLegacyTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserverLegacyTest.java
@@ -17,6 +17,7 @@
package org.apache.flink.kubernetes.operator.observer;
+import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.kubernetes.operator.OperatorTestBase;
@@ -328,7 +329,7 @@ public class SnapshotObserverLegacyTest extends
OperatorTestBase {
var jobStatus = status.getJobStatus();
status.getReconciliationStatus()
.serializeAndSetLastReconciledSpec(deployment.getSpec(),
deployment);
- jobStatus.setState("RUNNING");
+ jobStatus.setState(JobStatus.RUNNING);
var savepointInfo = jobStatus.getSavepointInfo();
flinkService.triggerSavepointLegacy(null,
SnapshotTriggerType.PERIODIC, deployment, conf);
@@ -364,7 +365,7 @@ public class SnapshotObserverLegacyTest extends
OperatorTestBase {
var jobStatus = status.getJobStatus();
status.getReconciliationStatus()
.serializeAndSetLastReconciledSpec(deployment.getSpec(),
deployment);
- jobStatus.setState("RUNNING");
+ jobStatus.setState(JobStatus.RUNNING);
var checkpointInfo = jobStatus.getCheckpointInfo();
var triggerId = flinkService.triggerCheckpoint(null,
CheckpointType.FULL, conf);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index 2f06504b..b9adee62 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -128,7 +128,9 @@ public class ApplicationObserverTest extends
OperatorTestBase {
assertEquals(
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
- assertEquals(JobState.RUNNING.name(),
deployment.getStatus().getJobStatus().getState());
+ assertEquals(
+ org.apache.flink.api.common.JobStatus.RUNNING,
+ deployment.getStatus().getJobStatus().getState());
assertEquals(
deployment.getStatus().getReconciliationStatus().getLastReconciledSpec(),
deployment.getStatus().getReconciliationStatus().getLastStableSpec());
@@ -137,7 +139,9 @@ public class ApplicationObserverTest extends
OperatorTestBase {
assertEquals(
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
- assertEquals(JobState.RUNNING.name(),
deployment.getStatus().getJobStatus().getState());
+ assertEquals(
+ org.apache.flink.api.common.JobStatus.RUNNING,
+ deployment.getStatus().getJobStatus().getState());
assertEquals(
deployment.getMetadata().getName(),
@@ -177,7 +181,7 @@ public class ApplicationObserverTest extends
OperatorTestBase {
JobManagerDeploymentStatus.READY,
deployment.getStatus().getJobManagerDeploymentStatus());
assertEquals(
- org.apache.flink.api.common.JobStatus.RECONCILING.name(),
+ org.apache.flink.api.common.JobStatus.RECONCILING,
deployment.getStatus().getJobStatus().getState());
assertNull(deployment.getStatus().getReconciliationStatus().getLastStableSpec());
}
@@ -488,7 +492,7 @@ public class ApplicationObserverTest extends
OperatorTestBase {
observer.observe(deployment, readyContext);
assertEquals(
- org.apache.flink.api.common.JobStatus.FAILED.name(),
+ org.apache.flink.api.common.JobStatus.FAILED,
deployment.getStatus().getJobStatus().getState());
assertEquals("last-SP",
deployment.getStatus().getJobStatus().getUpgradeSavepointPath());
assertFalse(SnapshotUtils.savepointInProgress(deployment.getStatus().getJobStatus()));
@@ -653,8 +657,7 @@ public class ApplicationObserverTest extends
OperatorTestBase {
observer.observe(deployment, readyContext);
assertEquals(
- org.apache.flink.api.common.JobStatus.FAILED.name(),
- getJobStatus(deployment).getState());
+ org.apache.flink.api.common.JobStatus.FAILED,
getJobStatus(deployment).getState());
assertFalse(SnapshotUtils.checkpointInProgress(getJobStatus(deployment)));
}
@@ -745,7 +748,7 @@ public class ApplicationObserverTest extends
OperatorTestBase {
ReconciliationUtils.updateStatusForDeployedSpec(deployment, new
Configuration());
JobStatus jobStatus = deployment.getStatus().getJobStatus();
jobStatus.setJobName("jobname");
- jobStatus.setState(JobState.RUNNING.name());
+ jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING);
deployment.getStatus().setJobStatus(jobStatus);
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
index 1b6bdb0b..530d3584 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.java
@@ -85,12 +85,12 @@ public class FlinkSessionJobObserverTest extends
OperatorTestBase {
var jobID = sessionJob.getStatus().getJobStatus().getJobId();
Assertions.assertNotNull(jobID);
Assertions.assertEquals(
- JobStatus.RECONCILING.name(),
sessionJob.getStatus().getJobStatus().getState());
+ JobStatus.RECONCILING,
sessionJob.getStatus().getJobStatus().getState());
// observe with empty context will do nothing
observer.observe(sessionJob, TestUtils.createEmptyContext());
Assertions.assertEquals(
- JobStatus.RECONCILING.name(),
sessionJob.getStatus().getJobStatus().getState());
+ JobStatus.RECONCILING,
sessionJob.getStatus().getJobStatus().getState());
var reconStatus = sessionJob.getStatus().getReconciliationStatus();
Assertions.assertNotEquals(
@@ -99,22 +99,22 @@ public class FlinkSessionJobObserverTest extends
OperatorTestBase {
// observe with ready context
observer.observe(sessionJob, readyContext);
Assertions.assertEquals(
- JobStatus.RUNNING.name(),
sessionJob.getStatus().getJobStatus().getState());
+ JobStatus.RUNNING,
sessionJob.getStatus().getJobStatus().getState());
Assertions.assertEquals(
reconStatus.getLastReconciledSpec(),
reconStatus.getLastStableSpec());
flinkService.setPortReady(false);
observer.observe(sessionJob, readyContext);
Assertions.assertEquals(
- JobStatus.RECONCILING.name(),
sessionJob.getStatus().getJobStatus().getState());
+ JobStatus.RECONCILING,
sessionJob.getStatus().getJobStatus().getState());
-
sessionJob.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
+ sessionJob.getStatus().getJobStatus().setState(JobStatus.RUNNING);
// no matched job id, update the state to unknown
flinkService.setPortReady(true);
sessionJob.getStatus().getJobStatus().setJobId(new
JobID().toHexString());
observer.observe(sessionJob, readyContext);
Assertions.assertEquals(
- JobStatus.RECONCILING.name(),
sessionJob.getStatus().getJobStatus().getState());
+ JobStatus.RECONCILING,
sessionJob.getStatus().getJobStatus().getState());
Assertions.assertEquals(
JobState.SUSPENDED,
sessionJob
@@ -138,13 +138,13 @@ public class FlinkSessionJobObserverTest extends
OperatorTestBase {
Assertions.assertNotNull(jobID);
Assertions.assertNotEquals(jobID, jobID2);
Assertions.assertEquals(
- JobStatus.RECONCILING.name(),
sessionJob.getStatus().getJobStatus().getState());
+ JobStatus.RECONCILING,
sessionJob.getStatus().getJobStatus().getState());
observer.observe(sessionJob2, readyContext);
Assertions.assertEquals(
- JobStatus.RUNNING.name(),
sessionJob2.getStatus().getJobStatus().getState());
+ JobStatus.RUNNING,
sessionJob2.getStatus().getJobStatus().getState());
observer.observe(sessionJob, readyContext);
Assertions.assertEquals(
- JobStatus.RUNNING.name(),
sessionJob.getStatus().getJobStatus().getState());
+ JobStatus.RUNNING,
sessionJob.getStatus().getJobStatus().getState());
// test error behaviour if job not present
flinkService.clear();
@@ -153,7 +153,7 @@ public class FlinkSessionJobObserverTest extends
OperatorTestBase {
observer.observe(sessionJob2, readyContext);
Assertions.assertEquals(
- JobStatus.RECONCILING.name(),
sessionJob2.getStatus().getJobStatus().getState());
+ JobStatus.RECONCILING,
sessionJob2.getStatus().getJobStatus().getState());
Assertions.assertTrue(
sessionJob2.getStatus().getError().contains(JobStatusObserver.JOB_NOT_FOUND_ERR));
Assertions.assertEquals(
@@ -173,14 +173,14 @@ public class FlinkSessionJobObserverTest extends
OperatorTestBase {
var jobID = sessionJob.getStatus().getJobStatus().getJobId();
Assertions.assertNotNull(jobID);
Assertions.assertEquals(
- JobStatus.RECONCILING.name(),
sessionJob.getStatus().getJobStatus().getState());
+ JobStatus.RECONCILING,
sessionJob.getStatus().getJobStatus().getState());
flinkService.setListJobConsumer(
(configuration) ->
Assertions.assertEquals(8088,
configuration.getInteger(RestOptions.PORT)));
observer.observe(sessionJob, readyContext);
Assertions.assertEquals(
- JobStatus.RUNNING.name(),
sessionJob.getStatus().getJobStatus().getState());
+ JobStatus.RUNNING,
sessionJob.getStatus().getJobStatus().getState());
}
@Test
@@ -192,11 +192,11 @@ public class FlinkSessionJobObserverTest extends
OperatorTestBase {
var jobID = sessionJob.getStatus().getJobStatus().getJobId();
Assertions.assertNotNull(jobID);
Assertions.assertEquals(
- JobStatus.RECONCILING.name(),
sessionJob.getStatus().getJobStatus().getState());
+ JobStatus.RECONCILING,
sessionJob.getStatus().getJobStatus().getState());
observer.observe(sessionJob, readyContext);
Assertions.assertEquals(
- JobStatus.RUNNING.name(),
sessionJob.getStatus().getJobStatus().getState());
+ JobStatus.RUNNING,
sessionJob.getStatus().getJobStatus().getState());
var savepointInfo =
sessionJob.getStatus().getJobStatus().getSavepointInfo();
Assertions.assertFalse(
@@ -238,10 +238,10 @@ public class FlinkSessionJobObserverTest extends
OperatorTestBase {
var jobID = sessionJob.getStatus().getJobStatus().getJobId();
Assertions.assertNotNull(jobID);
Assertions.assertEquals(
- JobStatus.RECONCILING.name(),
sessionJob.getStatus().getJobStatus().getState());
+ JobStatus.RECONCILING,
sessionJob.getStatus().getJobStatus().getState());
observer.observe(sessionJob, readyContext);
- assertEquals(JobStatus.RUNNING.name(),
sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(JobStatus.RUNNING,
sessionJob.getStatus().getJobStatus().getState());
var checkpointInfo =
sessionJob.getStatus().getJobStatus().getCheckpointInfo();
assertFalse(SnapshotUtils.checkpointInProgress(sessionJob.getStatus().getJobStatus()));
@@ -280,7 +280,7 @@ public class FlinkSessionJobObserverTest extends
OperatorTestBase {
@ValueSource(booleans = {true, false})
public void testObserveAlreadySubmitted(boolean submitted) {
var sessionJob = TestUtils.buildSessionJob();
-
sessionJob.getStatus().getJobStatus().setState(JobStatus.RECONCILING.name());
+ sessionJob.getStatus().getJobStatus().setState(JobStatus.RECONCILING);
sessionJob.getMetadata().setGeneration(10L);
var readyContext =
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
@@ -305,7 +305,7 @@ public class FlinkSessionJobObserverTest extends
OperatorTestBase {
submitted ? ReconciliationState.DEPLOYED :
ReconciliationState.UPGRADING,
sessionJob.getStatus().getReconciliationStatus().getState());
Assertions.assertEquals(
- submitted ? JobStatus.RUNNING.name() :
JobStatus.RECONCILING.name(),
+ submitted ? JobStatus.RUNNING : JobStatus.RECONCILING,
sessionJob.getStatus().getJobStatus().getState());
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index e00305d3..8bf19ff1 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -109,6 +109,9 @@ import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
+import static org.apache.flink.api.common.JobStatus.FINISHED;
+import static org.apache.flink.api.common.JobStatus.RECONCILING;
+import static org.apache.flink.api.common.JobStatus.RUNNING;
import static
org.apache.flink.kubernetes.operator.api.utils.FlinkResourceUtils.getCheckpointInfo;
import static
org.apache.flink.kubernetes.operator.api.utils.FlinkResourceUtils.getJobSpec;
import static
org.apache.flink.kubernetes.operator.api.utils.FlinkResourceUtils.getJobStatus;
@@ -321,7 +324,7 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
.setLastStableSpec(
deployment.getStatus().getReconciliationStatus().getLastReconciledSpec());
flinkService.setHaDataAvailable(false);
- getJobStatus(deployment).setState("RECONCILING");
+ getJobStatus(deployment).setState(RECONCILING);
try {
deployment
@@ -343,7 +346,7 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
getJobSpec(deployment).setUpgradeMode(UpgradeMode.LAST_STATE);
deployment.getSpec().setRestartNonce(200L);
flinkService.setHaDataAvailable(false);
- getJobStatus(deployment).setState("FINISHED");
+ getJobStatus(deployment).setState(FINISHED);
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
reconciler.reconcile(deployment, context);
reconciler.reconcile(deployment, context);
@@ -693,7 +696,7 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
.jobId(runningJobs.get(0).f1.getJobId().toHexString())
.jobName(runningJobs.get(0).f1.getJobName())
.updateTime(Long.toString(System.currentTimeMillis()))
- .state("RUNNING")
+ .state(RUNNING)
.build());
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
}
@@ -711,7 +714,9 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
getJobSpec(spDeployment).setSavepointTriggerNonce(ThreadLocalRandom.current().nextLong());
reconciler.reconcile(spDeployment, context);
assertEquals("savepoint_trigger_0",
getSavepointInfo(spDeployment).getTriggerId());
- assertEquals(JobState.RUNNING.name(),
getJobStatus(spDeployment).getState());
+ assertEquals(
+ org.apache.flink.api.common.JobStatus.RUNNING,
+ getJobStatus(spDeployment).getState());
// Force upgrade when savepoint is in progress.
spDeployment
@@ -724,7 +729,7 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
reconciler.reconcile(spDeployment, context);
assertEquals("savepoint_trigger_0",
getSavepointInfo(spDeployment).getTriggerId());
assertEquals(
- org.apache.flink.api.common.JobStatus.FINISHED.name(),
+ org.apache.flink.api.common.JobStatus.FINISHED,
getJobStatus(spDeployment).getState());
}
@@ -741,7 +746,7 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
FlinkDeploymentSpec spec = flinkApp.getSpec();
Configuration deployConfig = configManager.getDeployConfig(deployMeta,
spec);
-
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
+
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED);
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
reconciler
.getReconciler()
@@ -750,7 +755,7 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
String path1 = deployConfig.get(JobResultStoreOptions.STORAGE_PATH);
Assertions.assertTrue(path1.startsWith(haStoragePath));
-
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
+
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED);
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
reconciler
.getReconciler()
@@ -775,7 +780,7 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
reconciler.reconcile(deployment, context);
assertEquals(ReconciliationState.DEPLOYED, reconStatus.getState());
- getJobStatus(deployment).setState(JobState.RUNNING.name());
+ getJobStatus(deployment).setState(RUNNING);
getJobStatus(deployment)
.setJobId(flinkService.listJobs().get(0).f1.getJobId().toHexString());
@@ -951,7 +956,7 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
assertEquals(
ReconciliationState.DEPLOYED,
deployment.getStatus().getReconciliationStatus().getState());
- assertEquals("RUNNING",
deployment.getStatus().getJobStatus().getState());
+ assertEquals(RUNNING,
deployment.getStatus().getJobStatus().getState());
// Test overrides are applied correctly
var v1 = new JobVertexID();
@@ -998,7 +1003,7 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
FlinkDeploymentSpec spec = flinkApp.getSpec();
Configuration deployConfig = configManager.getDeployConfig(deployMeta,
spec);
-
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
+
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED);
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
reconciler
.getReconciler()
@@ -1022,12 +1027,20 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
@Test
public void testTerminalJmTtlOnFinished() throws Throwable {
- testTerminalJmTtl(dep ->
dep.getStatus().getJobStatus().setState("FINISHED"));
+ testTerminalJmTtl(
+ dep ->
+ dep.getStatus()
+ .getJobStatus()
+
.setState(org.apache.flink.api.common.JobStatus.FINISHED));
}
@Test
public void testTerminalJmTtlOnFailed() throws Throwable {
- testTerminalJmTtl(dep ->
dep.getStatus().getJobStatus().setState("FAILED"));
+ testTerminalJmTtl(
+ dep ->
+ dep.getStatus()
+ .getJobStatus()
+
.setState(org.apache.flink.api.common.JobStatus.FAILED));
}
public void testTerminalJmTtl(ThrowingConsumer<FlinkDeployment>
deploymentSetup)
@@ -1076,7 +1089,7 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
status.getReconciliationStatus().serializeAndSetLastReconciledSpec(spec,
flinkApp);
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
-
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
+
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED);
var deleted = new AtomicBoolean(false);
@@ -1112,9 +1125,7 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
flinkService.clear();
FlinkDeploymentStatus deploymentStatus = deployment.getStatus();
deploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
- deploymentStatus
- .getJobStatus()
-
.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+ deploymentStatus.getJobStatus().setState(RECONCILING);
reconciler.reconcile(deployment, context);
Assertions.assertEquals(
MSG_RECOVERY,
flinkResourceEventCollector.events.remove().getMessage());
@@ -1298,7 +1309,7 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
ReconciliationState.ROLLED_BACK,
deployment.getStatus().getReconciliationStatus().getState());
assertEquals(1, flinkService.listJobs().size());
- assertEquals("RECONCILING",
deployment.getStatus().getJobStatus().getState());
+ assertEquals(RECONCILING,
deployment.getStatus().getJobStatus().getState());
}
@ParameterizedTest
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
index 93a25e82..9d489302 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
@@ -63,6 +63,12 @@ import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;
+import static org.apache.flink.api.common.JobStatus.CANCELLING;
+import static org.apache.flink.api.common.JobStatus.FAILING;
+import static org.apache.flink.api.common.JobStatus.FINISHED;
+import static org.apache.flink.api.common.JobStatus.RECONCILING;
+import static org.apache.flink.api.common.JobStatus.RESTARTING;
+import static org.apache.flink.api.common.JobStatus.RUNNING;
import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -210,7 +216,10 @@ public class ApplicationReconcilerUpgradeModeTest extends
OperatorTestBase {
.setLastStableSpec(
deployment.getStatus().getReconciliationStatus().getLastReconciledSpec());
flinkService.setHaDataAvailable(false);
- deployment.getStatus().getJobStatus().setState("RECONCILING");
+ deployment
+ .getStatus()
+ .getJobStatus()
+ .setState(org.apache.flink.api.common.JobStatus.RECONCILING);
Assertions.assertThrows(
UpgradeFailureException.class,
@@ -236,7 +245,10 @@ public class ApplicationReconcilerUpgradeModeTest extends
OperatorTestBase {
deployment.getSpec().setRestartNonce(200L);
flinkService.setHaDataAvailable(false);
deployment.getStatus().getJobStatus().setUpgradeSavepointPath("finished_sp");
- deployment.getStatus().getJobStatus().setState("FINISHED");
+ deployment
+ .getStatus()
+ .getJobStatus()
+ .setState(org.apache.flink.api.common.JobStatus.FINISHED);
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
deployment
.getSpec()
@@ -287,7 +299,7 @@ public class ApplicationReconcilerUpgradeModeTest extends
OperatorTestBase {
0L));
}
- deployment.getStatus().getJobStatus().setState("FINISHED");
+ deployment.getStatus().getJobStatus().setState(FINISHED);
reconciler.reconcile(deployment, context);
reconciler.reconcile(deployment, context);
@@ -503,7 +515,7 @@ public class ApplicationReconcilerUpgradeModeTest extends
OperatorTestBase {
var jobStatus = deployment.getStatus().getJobStatus();
long now = System.currentTimeMillis();
- jobStatus.setState("RUNNING");
+ jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING);
jobStatus.setStartTime(Long.toString(now));
jobStatus.setJobId(new JobID().toString());
@@ -626,7 +638,7 @@ public class ApplicationReconcilerUpgradeModeTest extends
OperatorTestBase {
jobStatus.setJobId(new JobID().toString());
// Running state, savepoint if possible
-
jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
+ jobStatus.setState(RUNNING);
var ctx = getResourceContext(deployment);
var deployConf = ctx.getDeployConfig(deployment.getSpec());
@@ -637,13 +649,13 @@ public class ApplicationReconcilerUpgradeModeTest extends
OperatorTestBase {
jobReconciler.getJobUpgrade(ctx, deployConf));
// Not running (but cancellable)
-
jobStatus.setState(org.apache.flink.api.common.JobStatus.RESTARTING.name());
+ jobStatus.setState(RESTARTING);
assertEquals(
AbstractJobReconciler.JobUpgrade.lastStateUsingCancel(),
jobReconciler.getJobUpgrade(ctx, deployConf));
// Unknown / reconciling
-
jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+ jobStatus.setState(RECONCILING);
assertEquals(
AbstractJobReconciler.JobUpgrade.pendingUpgrade(),
jobReconciler.getJobUpgrade(ctx, deployConf));
@@ -690,7 +702,7 @@ public class ApplicationReconcilerUpgradeModeTest extends
OperatorTestBase {
jobStatus.setJobId(new JobID().toString());
// Running state, savepoint if possible
-
jobStatus.setState(org.apache.flink.api.common.JobStatus.FAILING.name());
+ jobStatus.setState(FAILING);
var ctx = getResourceContext(deployment);
var deployConf = ctx.getDeployConfig(deployment.getSpec());
@@ -798,7 +810,7 @@ public class ApplicationReconcilerUpgradeModeTest extends
OperatorTestBase {
reconciler.reconcile(deployment, context);
assertEquals(0, flinkService.getRunningCount());
assertEquals(
- org.apache.flink.api.common.JobStatus.FINISHED.name(),
+ org.apache.flink.api.common.JobStatus.FINISHED,
deployment.getStatus().getJobStatus().getState());
var snapshots =
TestUtils.getFlinkStateSnapshotsForResource(kubernetesClient, deployment);
@@ -845,11 +857,11 @@ public class ApplicationReconcilerUpgradeModeTest extends
OperatorTestBase {
// Ready for spec changes, the reconciliation should be performed
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
reconciler.reconcile(deployment, context);
- assertEquals("CANCELLING",
deployment.getStatus().getJobStatus().getState());
+ assertEquals(CANCELLING,
deployment.getStatus().getJobStatus().getState());
String expectedSavepointPath = "savepoint_0";
var jobStatus = deployment.getStatus().getJobStatus();
- jobStatus.setState("CANCELED");
+ jobStatus.setState(org.apache.flink.api.common.JobStatus.CANCELED);
jobStatus
.getSavepointInfo()
.setLastSavepoint(Savepoint.of(expectedSavepointPath,
SnapshotTriggerType.UNKNOWN));
@@ -947,7 +959,7 @@ public class ApplicationReconcilerUpgradeModeTest extends
OperatorTestBase {
.jobId(runningJobs.get(0).f1.getJobId().toHexString())
.jobName(runningJobs.get(0).f1.getJobName())
.startTime(Long.toString(System.currentTimeMillis()))
- .state("RUNNING")
+
.state(org.apache.flink.api.common.JobStatus.RUNNING)
.build());
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
index 87f44b49..54484065 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
@@ -127,7 +127,7 @@ public class SessionReconcilerTest extends OperatorTestBase
{
FlinkDeploymentSpec spec = flinkApp.getSpec();
Configuration deployConfig = configManager.getDeployConfig(deployMeta,
spec);
-
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
+
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED);
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
reconciler
.getReconciler()
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
index 0185b765..c72b5082 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
@@ -122,7 +122,7 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
sessionJob,
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
assertEquals(1, flinkService.listJobs().size());
verifyAndSetRunningJobsToStatus(
- sessionJob, JobState.RUNNING, RECONCILING.name(), null,
flinkService.listJobs());
+ sessionJob, JobState.RUNNING, RECONCILING, null,
flinkService.listJobs());
// clean up
reconciler.cleanup(
@@ -165,7 +165,7 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
sessionJob,
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
assertEquals(1, flinkService.listJobs().size());
verifyAndSetRunningJobsToStatus(
- sessionJob, JobState.RUNNING, RECONCILING.name(), null,
flinkService.listJobs());
+ sessionJob, JobState.RUNNING, RECONCILING, null,
flinkService.listJobs());
// clean up
reconciler.cleanup(
@@ -201,7 +201,7 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
sessionJob,
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
assertEquals(1, flinkService.listJobs().size());
verifyAndSetRunningJobsToStatus(
- sessionJob, JobState.RUNNING, RECONCILING.name(), null,
flinkService.listJobs());
+ sessionJob, JobState.RUNNING, RECONCILING, null,
flinkService.listJobs());
// clean up
reconciler.cleanup(
sessionJob,
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
@@ -217,7 +217,7 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
sessionJob,
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
assertEquals(1, flinkService.listJobs().size());
verifyAndSetRunningJobsToStatus(
- sessionJob, JobState.RUNNING, RECONCILING.name(), null,
flinkService.listJobs());
+ sessionJob, JobState.RUNNING, RECONCILING, null,
flinkService.listJobs());
// clean up
flinkService.setPortReady(false);
var deleteControl =
@@ -230,7 +230,10 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
flinkService.setPortReady(true);
reconciler.cleanup(
sessionJob,
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
- sessionJob.getStatus().getJobStatus().setState("CANCELED");
+ sessionJob
+ .getStatus()
+ .getJobStatus()
+ .setState(org.apache.flink.api.common.JobStatus.CANCELED);
deleteControl =
reconciler.cleanup(
sessionJob,
@@ -248,14 +251,17 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
sessionJob,
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
assertEquals(1, flinkService.listJobs().size());
verifyAndSetRunningJobsToStatus(
- sessionJob, JobState.RUNNING, RECONCILING.name(), null,
flinkService.listJobs());
+ sessionJob, JobState.RUNNING, RECONCILING, null,
flinkService.listJobs());
// clean up
flinkService.setFlinkJobTerminatedWithoutCancellation(true);
var deleteControl =
reconciler.cleanup(
sessionJob,
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
- sessionJob.getStatus().getJobStatus().setState("CANCELED");
+ sessionJob
+ .getStatus()
+ .getJobStatus()
+ .setState(org.apache.flink.api.common.JobStatus.CANCELED);
deleteControl =
reconciler.cleanup(
@@ -273,16 +279,19 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
sessionJob,
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
assertEquals(1, flinkService.listJobs().size());
verifyAndSetRunningJobsToStatus(
- sessionJob, JobState.RUNNING, RECONCILING.name(), null,
flinkService.listJobs());
+ sessionJob, JobState.RUNNING, RECONCILING, null,
flinkService.listJobs());
sessionJob.getSpec().setRestartNonce(2L);
reconciler.reconcile(
sessionJob,
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
assertEquals(CANCELED,
flinkService.listJobs().get(0).f1.getJobState());
- sessionJob.getStatus().getJobStatus().setState("CANCELED");
+ sessionJob
+ .getStatus()
+ .getJobStatus()
+ .setState(org.apache.flink.api.common.JobStatus.CANCELED);
reconciler.reconcile(
sessionJob,
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
verifyAndSetRunningJobsToStatus(
- sessionJob, JobState.RUNNING, RECONCILING.name(), null,
flinkService.listJobs());
+ sessionJob, JobState.RUNNING, RECONCILING, null,
flinkService.listJobs());
}
@Test
@@ -294,9 +303,9 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
reconciler.reconcile(sessionJob, readyContext);
assertEquals(1, flinkService.listJobs().size());
verifyAndSetRunningJobsToStatus(
- sessionJob, JobState.RUNNING, RECONCILING.name(), null,
flinkService.listJobs());
+ sessionJob, JobState.RUNNING, RECONCILING, null,
flinkService.listJobs());
- sessionJob.getStatus().getJobStatus().setState(FAILED.name());
+ sessionJob.getStatus().getJobStatus().setState(FAILED);
reconciler.reconcile(sessionJob, readyContext);
assertEquals(2, flinkService.listJobs().size());
assertEquals(RUNNING, flinkService.listJobs().get(1).f1.getJobState());
@@ -313,7 +322,7 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
verifyAndSetRunningJobsToStatus(
sessionJob,
JobState.RUNNING,
- RECONCILING.name(),
+ RECONCILING,
initSavepointPath,
flinkService.listJobs());
}
@@ -326,7 +335,7 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
reconciler.reconcile(sessionJob, readyContext);
assertEquals(1, flinkService.listJobs().size());
verifyAndSetRunningJobsToStatus(
- sessionJob, JobState.RUNNING, RECONCILING.name(), null,
flinkService.listJobs());
+ sessionJob, JobState.RUNNING, RECONCILING, null,
flinkService.listJobs());
var statelessSessionJob = ReconciliationUtils.clone(sessionJob);
statelessSessionJob.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
@@ -334,18 +343,20 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
// job suspended first
reconciler.reconcile(statelessSessionJob, readyContext);
assertEquals(CANCELED,
flinkService.listJobs().get(0).f1.getJobState());
- verifyJobState(statelessSessionJob, JobState.RUNNING, "CANCELLING");
- statelessSessionJob.getStatus().getJobStatus().setState("CANCELED");
+ verifyJobState(
+ statelessSessionJob,
+ JobState.RUNNING,
+ org.apache.flink.api.common.JobStatus.CANCELLING);
+ statelessSessionJob
+ .getStatus()
+ .getJobStatus()
+ .setState(org.apache.flink.api.common.JobStatus.CANCELED);
flinkService.clear();
reconciler.reconcile(statelessSessionJob, readyContext);
assertEquals(1, flinkService.listJobs().size());
verifyAndSetRunningJobsToStatus(
- statelessSessionJob,
- JobState.RUNNING,
- RECONCILING.name(),
- null,
- flinkService.listJobs());
+ statelessSessionJob, JobState.RUNNING, RECONCILING, null,
flinkService.listJobs());
}
@ParameterizedTest
@@ -377,17 +388,13 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
statefulSessionJob.getSpec().getJob().setParallelism(3);
verifyAndSetRunningJobsToStatus(
- statefulSessionJob,
- JobState.RUNNING,
- RECONCILING.name(),
- null,
- flinkService.listJobs());
+ statefulSessionJob, JobState.RUNNING, RECONCILING, null,
flinkService.listJobs());
reconciler.reconcile(statefulSessionJob, readyContext);
// job suspended first
assertEquals(FINISHED,
flinkService.listJobs().get(0).f1.getJobState());
- verifyJobState(statefulSessionJob, JobState.SUSPENDED, "FINISHED");
+ verifyJobState(statefulSessionJob, JobState.SUSPENDED, FINISHED);
if (legacySnapshots) {
assertEquals(
"savepoint_0",
@@ -416,7 +423,7 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
verifyAndSetRunningJobsToStatus(
statefulSessionJob,
JobState.RUNNING,
- RECONCILING.name(),
+ RECONCILING,
"savepoint_0",
flinkService.listJobs());
}
@@ -431,7 +438,7 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
var readyContext =
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
reconciler.reconcile(sessionJob, readyContext);
verifyAndSetRunningJobsToStatus(
- sessionJob, JobState.RUNNING, RECONCILING.name(), null,
flinkService.listJobs());
+ sessionJob, JobState.RUNNING, RECONCILING, null,
flinkService.listJobs());
assertFalse(SnapshotUtils.savepointInProgress(sessionJob.getStatus().getJobStatus()));
@@ -443,12 +450,12 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
assertFalse(SnapshotUtils.savepointInProgress(sp1SessionJob.getStatus().getJobStatus()));
sp1SessionJob.getSpec().getJob().setSavepointTriggerNonce(2L);
- sp1SessionJob.getStatus().getJobStatus().setState(CREATED.name());
+ sp1SessionJob.getStatus().getJobStatus().setState(CREATED);
reconciler.reconcile(sp1SessionJob, readyContext);
// do not trigger savepoint if job is not running
assertFalse(SnapshotUtils.savepointInProgress(sp1SessionJob.getStatus().getJobStatus()));
- sp1SessionJob.getStatus().getJobStatus().setState(RUNNING.name());
+ sp1SessionJob.getStatus().getJobStatus().setState(RUNNING);
reconciler.reconcile(sp1SessionJob, readyContext);
assertTrue(SnapshotUtils.savepointInProgress(sp1SessionJob.getStatus().getJobStatus()));
@@ -505,7 +512,10 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
// running -> suspended
reconciler.reconcile(sp1SessionJob, readyContext);
- sp1SessionJob.getStatus().getJobStatus().setState("CANCELED");
+ sp1SessionJob
+ .getStatus()
+ .getJobStatus()
+ .setState(org.apache.flink.api.common.JobStatus.CANCELED);
// suspended -> running
reconciler.reconcile(sp1SessionJob, readyContext);
// parallelism changed
@@ -518,7 +528,7 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
.getJob()
.getParallelism());
verifyAndSetRunningJobsToStatus(
- sp1SessionJob, JobState.RUNNING, RECONCILING.name(), null,
flinkService.listJobs());
+ sp1SessionJob, JobState.RUNNING, RECONCILING, null,
flinkService.listJobs());
sp1SessionJob.getStatus().getJobStatus().getSavepointInfo().resetTrigger();
ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce(
@@ -555,7 +565,7 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
var readyContext =
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
reconciler.reconcile(sessionJob, readyContext);
verifyAndSetRunningJobsToStatus(
- sessionJob, JobState.RUNNING, RECONCILING.name(), null,
flinkService.listJobs());
+ sessionJob, JobState.RUNNING, RECONCILING, null,
flinkService.listJobs());
assertFalse(SnapshotUtils.checkpointInProgress(getJobStatus(sessionJob)));
@@ -567,12 +577,12 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
assertFalse(SnapshotUtils.checkpointInProgress(getJobStatus(sp1SessionJob)));
getJobSpec(sp1SessionJob).setCheckpointTriggerNonce(2L);
- getJobStatus(sp1SessionJob).setState(CREATED.name());
+ getJobStatus(sp1SessionJob).setState(CREATED);
reconciler.reconcile(sp1SessionJob, readyContext);
// do not trigger checkpoint if job is not running
assertFalse(SnapshotUtils.checkpointInProgress(getJobStatus(sp1SessionJob)));
- getJobStatus(sp1SessionJob).setState(RUNNING.name());
+ getJobStatus(sp1SessionJob).setState(RUNNING);
reconciler.reconcile(sp1SessionJob, readyContext);
assertTrue(SnapshotUtils.checkpointInProgress(getJobStatus(sp1SessionJob)));
@@ -618,7 +628,7 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
reconciler.reconcile(sessionJob, readyContext);
assertEquals(1, flinkService.listJobs().size());
verifyAndSetRunningJobsToStatus(
- sessionJob, JobState.RUNNING, RECONCILING.name(), null,
flinkService.listJobs());
+ sessionJob, JobState.RUNNING, RECONCILING, null,
flinkService.listJobs());
var job = flinkService.listJobs().get(0);
var jobStatusMessage = job.f1;
@@ -634,13 +644,16 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
RUNNING,
jobStatusMessage.getStartTime());
// Set state which must be overwritten by cancelSessionJob
- sessionJob.getStatus().getJobStatus().setState("RUNNING");
+ sessionJob
+ .getStatus()
+ .getJobStatus()
+ .setState(org.apache.flink.api.common.JobStatus.RUNNING);
flinkService.cancelSessionJob(sessionJob, suspendMode, jobConfig);
assertEquals(1, flinkService.getCancelJobCallCount());
assertEquals(CANCELED, job.f1.getJobState());
- assertEquals(CANCELLING.name(),
sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(CANCELLING,
sessionJob.getStatus().getJobStatus().getState());
}
private static Stream<Arguments> cancelSavepointSessionJobParams() {
@@ -683,7 +696,7 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
reconciler.reconcile(sessionJob, readyContext);
assertEquals(1, flinkService.listJobs().size());
verifyAndSetRunningJobsToStatus(
- sessionJob, JobState.RUNNING, RECONCILING.name(), null,
flinkService.listJobs());
+ sessionJob, JobState.RUNNING, RECONCILING, null,
flinkService.listJobs());
var job = flinkService.listJobs().get(0);
var jobStatusMessage = job.f1;
@@ -699,7 +712,7 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
fromJobStatus,
jobStatusMessage.getStartTime());
// Set state which must be overwritten by cancelSessionJob
- sessionJob.getStatus().getJobStatus().setState(fromJobStatus.name());
+ sessionJob.getStatus().getJobStatus().setState(fromJobStatus);
if (!shouldThrowException) {
flinkService.cancelSessionJob(sessionJob, SuspendMode.SAVEPOINT,
jobConfig);
@@ -723,7 +736,7 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
assertEquals(FINISHED, job.f1.getJobState());
}
if (!shouldThrowException) {
- assertEquals(FINISHED.name(),
sessionJob.getStatus().getJobStatus().getState());
+ assertEquals(FINISHED,
sessionJob.getStatus().getJobStatus().getState());
}
}
@@ -740,7 +753,7 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
private void verifyAndSetRunningJobsToStatus(
FlinkSessionJob sessionJob,
JobState expectedState,
- String jobStatusObserved,
+ org.apache.flink.api.common.JobStatus jobStatusObserved,
@Nullable String expectedSavepointPath,
List<Tuple3<String, JobStatusMessage, Configuration>> jobs) {
@@ -750,11 +763,13 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
verifyJobState(sessionJob, expectedState, jobStatusObserved);
JobStatus jobStatus = sessionJob.getStatus().getJobStatus();
jobStatus.setJobName(submittedJobInfo.f1.getJobName());
- jobStatus.setState("RUNNING");
+ jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING);
}
private void verifyJobState(
- FlinkSessionJob sessionJob, JobState expectedState, String
jobStatusObserved) {
+ FlinkSessionJob sessionJob,
+ JobState expectedState,
+ org.apache.flink.api.common.JobStatus jobStatusObserved) {
assertEquals(
expectedState,
sessionJob
@@ -773,7 +788,7 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
reconciler.reconcile(sessionJob, readyContext);
verifyAndSetRunningJobsToStatus(
- sessionJob, JobState.RUNNING, RECONCILING.name(), null,
flinkService.listJobs());
+ sessionJob, JobState.RUNNING, RECONCILING, null,
flinkService.listJobs());
FlinkSessionJob spSessionJob = ReconciliationUtils.clone(sessionJob);
spSessionJob
@@ -788,7 +803,9 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
assertEquals(
"savepoint_trigger_0",
spSessionJob.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
- assertEquals(JobState.RUNNING.name(),
spSessionJob.getStatus().getJobStatus().getState());
+ assertEquals(
+ org.apache.flink.api.common.JobStatus.RUNNING,
+ spSessionJob.getStatus().getJobStatus().getState());
configManager.updateDefaultConfig(
Configuration.fromMap(
@@ -802,7 +819,7 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
assertEquals(
"savepoint_trigger_0",
spSessionJob.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
- assertEquals("CANCELLING",
spSessionJob.getStatus().getJobStatus().getState());
+ assertEquals(CANCELLING,
spSessionJob.getStatus().getJobStatus().getState());
}
@Test
@@ -816,8 +833,7 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
ReconciliationState.DEPLOYED,
sessionJob.getStatus().getReconciliationStatus().getState());
var jobID = sessionJob.getStatus().getJobStatus().getJobId();
- Assertions.assertEquals(
- RECONCILING.name(),
sessionJob.getStatus().getJobStatus().getState());
+ Assertions.assertEquals(RECONCILING,
sessionJob.getStatus().getJobStatus().getState());
Assertions.assertEquals(jobID,
flinkService.listJobs().get(0).f1.getJobId().toString());
flinkService.setSessionJobSubmittedCallback(
@@ -831,7 +847,10 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
() -> {
// suspend
reconciler.reconcile(sessionJob, readyContext);
- sessionJob.getStatus().getJobStatus().setState("CANCELED");
+ sessionJob
+ .getStatus()
+ .getJobStatus()
+
.setState(org.apache.flink.api.common.JobStatus.CANCELED);
// upgrade
reconciler.reconcile(sessionJob, readyContext);
});
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
index 514acbd4..336de17e 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
@@ -133,6 +133,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import static org.apache.flink.api.common.JobStatus.CANCELLING;
+import static org.apache.flink.api.common.JobStatus.FAILING;
+import static org.apache.flink.api.common.JobStatus.FINISHED;
+import static org.apache.flink.api.common.JobStatus.RUNNING;
import static
org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION;
import static
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -293,7 +297,7 @@ public class AbstractFlinkServiceTest {
ReconciliationUtils.updateStatusForDeployedSpec(deployment, new
Configuration());
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
- deployment.getStatus().getJobStatus().setState("RUNNING");
+ deployment.getStatus().getJobStatus().setState(RUNNING);
flinkService.cancelJob(
deployment,
SuspendMode.STATELESS,
@@ -303,7 +307,7 @@ public class AbstractFlinkServiceTest {
assertEquals(jobID, cancelFuture.get());
assertNull(jobStatus.getSavepointInfo().getLastSavepoint());
assertNull(jobStatus.getUpgradeSavepointPath());
- assertEquals("FINISHED", jobStatus.getState());
+ assertEquals(FINISHED, jobStatus.getState());
assertEquals(
List.of(
Tuple2.of(
@@ -336,7 +340,7 @@ public class AbstractFlinkServiceTest {
var job = TestUtils.buildSessionJob();
var jobStatus = job.getStatus().getJobStatus();
jobStatus.setJobId(jobID.toHexString());
- jobStatus.setState("RUNNING");
+ jobStatus.setState(RUNNING);
ReconciliationUtils.updateStatusForDeployedSpec(job, new
Configuration());
if (statusCode == 500) {
@@ -345,10 +349,10 @@ public class AbstractFlinkServiceTest {
() ->
flinkService.cancelSessionJob(
job, SuspendMode.STATELESS, new
Configuration()));
- assertEquals("RUNNING", jobStatus.getState());
+ assertEquals(RUNNING, jobStatus.getState());
} else {
flinkService.cancelSessionJob(job, SuspendMode.STATELESS, new
Configuration());
- assertEquals("CANCELLING", jobStatus.getState());
+ assertEquals(CANCELLING, jobStatus.getState());
}
}
@@ -380,7 +384,7 @@ public class AbstractFlinkServiceTest {
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
JobStatus jobStatus = deployment.getStatus().getJobStatus();
jobStatus.setJobId(jobID.toHexString());
-
jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
+ jobStatus.setState(RUNNING);
ReconciliationUtils.updateStatusForDeployedSpec(deployment, new
Configuration());
var result =
@@ -396,7 +400,7 @@ public class AbstractFlinkServiceTest {
assertFalse(stopWithSavepointFuture.get().f1);
assertEquals(savepointPath, stopWithSavepointFuture.get().f2);
- assertEquals(jobStatus.getState(),
org.apache.flink.api.common.JobStatus.FINISHED.name());
+ assertEquals(jobStatus.getState(),
org.apache.flink.api.common.JobStatus.FINISHED);
assertEquals(
deployment.getStatus().getJobManagerDeploymentStatus(),
deleteAfterSavepoint
@@ -443,7 +447,7 @@ public class AbstractFlinkServiceTest {
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
JobStatus jobStatus = deployment.getStatus().getJobStatus();
jobStatus.setJobId(jobID.toHexString());
-
jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
+ jobStatus.setState(RUNNING);
ReconciliationUtils.updateStatusForDeployedSpec(deployment, new
Configuration());
if (drainOnSavepoint) {
@@ -467,7 +471,7 @@ public class AbstractFlinkServiceTest {
assertTrue(stopWithSavepointFuture.isDone());
assertEquals(jobID, stopWithSavepointFuture.get().f0);
- assertEquals(jobStatus.getState(),
org.apache.flink.api.common.JobStatus.FINISHED.name());
+ assertEquals(jobStatus.getState(),
org.apache.flink.api.common.JobStatus.FINISHED);
if (drainOnSavepoint) {
assertTrue(stopWithSavepointFuture.get().f1);
@@ -509,7 +513,7 @@ public class AbstractFlinkServiceTest {
JobStatus jobStatus = job.getStatus().getJobStatus();
jobStatus.setJobId(jobID.toHexString());
-
jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
+ jobStatus.setState(RUNNING);
ReconciliationUtils.updateStatusForDeployedSpec(job, new
Configuration());
if (drainOnSavepoint) {
@@ -527,7 +531,7 @@ public class AbstractFlinkServiceTest {
var result = flinkService.cancelSessionJob(job, SuspendMode.SAVEPOINT,
deployConf);
assertTrue(stopWithSavepointFuture.isDone());
assertEquals(jobID, stopWithSavepointFuture.get().f0);
- assertEquals(jobStatus.getState(),
org.apache.flink.api.common.JobStatus.FINISHED.name());
+ assertEquals(jobStatus.getState(),
org.apache.flink.api.common.JobStatus.FINISHED);
assertEquals(savepointPath, result.getSavepointPath().get());
@@ -549,15 +553,13 @@ public class AbstractFlinkServiceTest {
JobID jobID = JobID.generate();
JobStatus jobStatus = deployment.getStatus().getJobStatus();
jobStatus.setJobId(jobID.toHexString());
- jobStatus.setState("FAILING");
+ jobStatus.setState(FAILING);
flinkService.cancelJob(
deployment, SuspendMode.CANCEL,
configManager.getObserveConfig(deployment), false);
assertTrue(flinkService.haDeleted.isEmpty());
assertTrue(flinkService.deleted.isEmpty());
- assertEquals("CANCELLING", jobStatus.getState());
- assertNull(jobStatus.getSavepointInfo().getLastSavepoint());
- assertNull(jobStatus.getUpgradeSavepointPath());
+ assertEquals(CANCELLING, jobStatus.getState());
}
@Test
@@ -752,7 +754,7 @@ public class AbstractFlinkServiceTest {
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
JobStatus jobStatus = deployment.getStatus().getJobStatus();
jobStatus.setJobId(jobID.toHexString());
-
jobStatus.setState(org.apache.flink.api.common.JobStatus.RUNNING.name());
+ jobStatus.setState(RUNNING);
ReconciliationUtils.updateStatusForDeployedSpec(deployment, new
Configuration());
jobStatus.setJobId(jobID.toString());
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
index 23f179b2..352c726c 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.kubernetes.operator.service;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
@@ -288,7 +289,7 @@ public class NativeFlinkServiceTest {
Map.of(v1.toHexString(), "4", v2.toHexString(), "1"));
spec.setFlinkConfiguration(appConfig.toMap());
- flinkDep.getStatus().getJobStatus().setState("RUNNING");
+ flinkDep.getStatus().getJobStatus().setState(JobStatus.RUNNING);
current.set(
Map.of(
@@ -357,16 +358,22 @@ public class NativeFlinkServiceTest {
// Make sure we only try to rescale non-terminal
testScaleConditionDep(
- flinkDep, service, d ->
d.getStatus().getJobStatus().setState("FAILED"), false);
+ flinkDep,
+ service,
+ d -> d.getStatus().getJobStatus().setState(JobStatus.FAILED),
+ false);
testScaleConditionDep(
flinkDep,
service,
- d -> d.getStatus().getJobStatus().setState("RECONCILING"),
+ d ->
d.getStatus().getJobStatus().setState(JobStatus.RECONCILING),
false);
testScaleConditionDep(
- flinkDep, service, d ->
d.getStatus().getJobStatus().setState("RUNNING"), true);
+ flinkDep,
+ service,
+ d -> d.getStatus().getJobStatus().setState(JobStatus.RUNNING),
+ true);
testScaleConditionDep(flinkDep, service, d ->
d.getSpec().setJob(null), false);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java
index 1a3dc8e6..7b05226b 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkStateSnapshotUtilsTest.java
@@ -42,6 +42,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
+import static org.apache.flink.api.common.JobStatus.FAILED;
import static org.apache.flink.kubernetes.operator.TestUtils.reconcileSpec;
import static
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.COMPLETED;
import static
org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus.State.IN_PROGRESS;
@@ -243,7 +244,7 @@ public class FlinkStateSnapshotUtilsTest {
assertFalse(result);
assertThat(eventCollector.events).isEmpty();
- deployment.getStatus().getJobStatus().setState("FAILED");
+ deployment.getStatus().getJobStatus().setState(FAILED);
result =
FlinkStateSnapshotUtils.abandonSnapshotIfJobNotRunning(
client, snapshot, deployment, eventRecorder);
@@ -262,7 +263,7 @@ public class FlinkStateSnapshotUtilsTest {
@Test
public void testAbandonSnapshotIfJobNotRunningJobFailed() {
var deployment = initDeployment();
- deployment.getStatus().getJobStatus().setState("FAILED");
+ deployment.getStatus().getJobStatus().setState(FAILED);
var snapshot = initSavepoint(IN_PROGRESS, null);
var eventCollector = new FlinkStateSnapshotEventCollector();
var eventRecorder = new EventRecorder((x, y) -> {}, eventCollector);
@@ -341,7 +342,7 @@ public class FlinkStateSnapshotUtilsTest {
private static FlinkDeployment initDeployment() {
FlinkDeployment deployment =
TestUtils.buildApplicationCluster(FlinkVersion.v1_19);
-
deployment.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
+ deployment.getStatus().getJobStatus().setState(JobStatus.RUNNING);
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
reconcileSpec(deployment);
return deployment;
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
index 37f05309..5b3cb541 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.java
@@ -359,7 +359,7 @@ public class SnapshotUtilsTest {
.getMetadata()
.setCreationTimestamp(Instant.now().minus(Duration.ofMinutes(15)).toString());
-
deployment.getStatus().getJobStatus().setState(JobStatus.RUNNING.name());
+ deployment.getStatus().getJobStatus().setState(JobStatus.RUNNING);
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
reconcileSpec(deployment);
return deployment;
diff --git
a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index b4c2d4c0..6042799d 100644
---
a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++
b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -10355,6 +10355,18 @@ spec:
startTime:
type: string
state:
+ enum:
+ - CANCELED
+ - CANCELLING
+ - CREATED
+ - FAILED
+ - FAILING
+ - FINISHED
+ - INITIALIZING
+ - RECONCILING
+ - RESTARTING
+ - RUNNING
+ - SUSPENDED
type: string
updateTime:
type: string
diff --git
a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
index b7ea567d..431787c7 100644
---
a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
+++
b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
@@ -197,6 +197,18 @@ spec:
startTime:
type: string
state:
+ enum:
+ - CANCELED
+ - CANCELLING
+ - CREATED
+ - FAILED
+ - FAILING
+ - FINISHED
+ - INITIALIZING
+ - RECONCILING
+ - RESTARTING
+ - RUNNING
+ - SUSPENDED
type: string
updateTime:
type: string