This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new a738c81  [FLINK-26507] Allow session and last-state jobs to reconcile 
in any state
a738c81 is described below

commit a738c815a12fc647d24cbbd10a2f4f0a5b9f6900
Author: Gyula Fora <[email protected]>
AuthorDate: Tue Mar 8 15:17:51 2022 +0100

    [FLINK-26507] Allow session and last-state jobs to reconcile in any state
---
 .../controller/FlinkDeploymentController.java      |  26 ++--
 .../observer/JobManagerDeploymentStatus.java       |   6 +-
 .../kubernetes/operator/observer/Observer.java     |  68 +++++------
 .../operator/reconciler/JobReconciler.java         |  77 ++++++++----
 .../kubernetes/operator/reconciler/Reconciler.java |   5 +-
 .../operator/reconciler/ReconciliationUtils.java   |   3 +-
 .../operator/reconciler/SessionReconciler.java     |  12 +-
 .../kubernetes/operator/utils/FlinkUtils.java      |   2 +-
 .../kubernetes/operator/utils/SavepointUtils.java  |   7 +-
 .../flink/kubernetes/operator/TestUtils.java       |  16 ++-
 .../kubernetes/operator/TestingFlinkService.java   |   5 +
 .../controller/FlinkDeploymentControllerTest.java  | 134 +++++++++++++++++++--
 .../kubernetes/operator/observer/ObserverTest.java |  47 ++++----
 .../operator/reconciler/JobReconcilerTest.java     |   2 +
 .../validation/DeploymentValidatorTest.java        |  11 +-
 15 files changed, 288 insertions(+), 133 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index 2a2a8d3..195defd 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -119,24 +119,17 @@ public class FlinkDeploymentController
                 FlinkUtils.getEffectiveConfig(flinkApp, 
defaultConfig.getFlinkConfig());
 
         try {
-            boolean readyToReconcile = observer.observe(flinkApp, context, 
effectiveConfig);
-            if (!readyToReconcile) {
-                return flinkApp.getStatus()
-                        .getJobManagerDeploymentStatus()
-                        .toUpdateControl(flinkApp, operatorConfiguration);
-            }
-
-            UpdateControl<FlinkDeployment> updateControl =
-                    reconcilerFactory
-                            .getOrCreate(flinkApp)
-                            .reconcile(operatorNamespace, flinkApp, context, 
effectiveConfig);
-            return updateControl;
+            observer.observe(flinkApp, context, effectiveConfig);
+            reconcilerFactory
+                    .getOrCreate(flinkApp)
+                    .reconcile(operatorNamespace, flinkApp, context, 
effectiveConfig);
         } catch (DeploymentFailedException dfe) {
             handleDeploymentFailed(flinkApp, dfe);
-            return UpdateControl.updateStatus(flinkApp);
         } catch (Exception e) {
             throw new ReconciliationException(e);
         }
+
+        return getUpdateControl(flinkApp);
     }
 
     private void handleDeploymentFailed(FlinkDeployment flinkApp, 
DeploymentFailedException dfe) {
@@ -157,6 +150,13 @@ public class FlinkDeploymentController
                 .create(event);
     }
 
+    private UpdateControl<FlinkDeployment> getUpdateControl(FlinkDeployment 
deployment) {
+        return deployment
+                .getStatus()
+                .getJobManagerDeploymentStatus()
+                .toUpdateControl(deployment, operatorConfiguration);
+    }
+
     @Override
     public List<EventSource> 
prepareEventSources(EventSourceContext<FlinkDeployment> ctx) {
         Preconditions.checkNotNull(controllerConfig, "Controller config cannot 
be null");
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
index 35a395d..4a7defc 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobManagerDeploymentStatus.java
@@ -48,6 +48,8 @@ public enum JobManagerDeploymentStatus {
         switch (this) {
             case DEPLOYING:
             case READY:
+            case MISSING:
+            case ERROR:
                 return UpdateControl.updateStatus(flinkDeployment)
                         .rescheduleAfter(
                                 
operatorConfiguration.getReconcileIntervalInSec(),
@@ -57,10 +59,8 @@ public enum JobManagerDeploymentStatus {
                         .rescheduleAfter(
                                 
operatorConfiguration.getPortCheckIntervalInSec(),
                                 TimeUnit.SECONDS);
-            case MISSING:
-            case ERROR:
             default:
-                return UpdateControl.noUpdate();
+                throw new RuntimeException("Unknown status: " + this);
         }
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
index 46fb983..fc0b894 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/Observer.java
@@ -48,6 +48,8 @@ public class Observer {
 
     private static final Logger LOG = LoggerFactory.getLogger(Observer.class);
 
+    public static final String JOB_STATE_UNKNOWN = "UNKNOWN";
+
     private final FlinkService flinkService;
     private final FlinkOperatorConfiguration operatorConfiguration;
 
@@ -56,10 +58,20 @@ public class Observer {
         this.operatorConfiguration = operatorConfiguration;
     }
 
-    public boolean observe(
-            FlinkDeployment flinkApp, Context context, Configuration 
effectiveConfig) {
+    public void observe(FlinkDeployment flinkApp, Context context, 
Configuration effectiveConfig) {
         observeJmDeployment(flinkApp, context, effectiveConfig);
-        return isReadyToReconcile(flinkApp, effectiveConfig);
+        if (isApplicationClusterReady(flinkApp)) {
+            boolean jobFound = observeFlinkJobStatus(flinkApp, 
effectiveConfig);
+            if (jobFound) {
+                observeSavepointStatus(flinkApp, effectiveConfig);
+            }
+        }
+    }
+
+    private boolean isApplicationClusterReady(FlinkDeployment dep) {
+        return dep.getSpec().getJob() != null
+                && dep.getStatus().getJobManagerDeploymentStatus()
+                        == JobManagerDeploymentStatus.READY;
     }
 
     private void observeJmDeployment(
@@ -123,12 +135,6 @@ public class Observer {
     }
 
     private boolean observeFlinkJobStatus(FlinkDeployment flinkApp, 
Configuration effectiveConfig) {
-
-        // No need to observe job status for session clusters
-        if (flinkApp.getSpec().getJob() == null) {
-            return true;
-        }
-
         LOG.info("Getting job statuses for {}", 
flinkApp.getMetadata().getName());
         FlinkDeploymentStatus flinkAppStatus = flinkApp.getStatus();
 
@@ -137,32 +143,32 @@ public class Observer {
             clusterJobStatuses = flinkService.listJobs(effectiveConfig);
         } catch (Exception e) {
             LOG.error("Exception while listing jobs", e);
-            flinkAppStatus.getJobStatus().setState("UNKNOWN");
+            flinkAppStatus.getJobStatus().setState(JOB_STATE_UNKNOWN);
             return false;
         }
         if (clusterJobStatuses.isEmpty()) {
             LOG.info("No jobs found on {} yet", 
flinkApp.getMetadata().getName());
+            flinkAppStatus.getJobStatus().setState(JOB_STATE_UNKNOWN);
             return false;
-        } else {
-            updateJobStatus(flinkAppStatus.getJobStatus(), new 
ArrayList<>(clusterJobStatuses));
-            LOG.info("Job statuses updated for {}", 
flinkApp.getMetadata().getName());
-            return true;
         }
+
+        updateJobStatus(flinkAppStatus.getJobStatus(), new 
ArrayList<>(clusterJobStatuses));
+        LOG.info("Job statuses updated for {}", 
flinkApp.getMetadata().getName());
+        return true;
     }
 
-    private boolean observeSavepointStatus(
-            FlinkDeployment flinkApp, Configuration effectiveConfig) {
+    private void observeSavepointStatus(FlinkDeployment flinkApp, 
Configuration effectiveConfig) {
         SavepointInfo savepointInfo = 
flinkApp.getStatus().getJobStatus().getSavepointInfo();
-        if (savepointInfo.getTriggerId() == null) {
+        if (!SavepointUtils.savepointInProgress(flinkApp)) {
             LOG.debug("Checkpointing not in progress");
-            return true;
+            return;
         }
         SavepointFetchResult savepointFetchResult;
         try {
             savepointFetchResult = flinkService.fetchSavepointInfo(flinkApp, 
effectiveConfig);
         } catch (Exception e) {
             LOG.error("Exception while fetching savepoint info", e);
-            return false;
+            return;
         }
 
         if (!savepointFetchResult.isTriggered()) {
@@ -173,37 +179,17 @@ public class Observer {
                 LOG.error(errorMsg);
                 savepointInfo.setTriggerId(null);
                 ReconciliationUtils.updateForReconciliationError(flinkApp, 
errorMsg);
-                return false;
+                return;
             }
             LOG.info("Savepoint operation not running, waiting within grace 
period");
         }
         if (savepointFetchResult.getSavepoint() == null) {
             LOG.info("Savepoint not completed yet");
-            return false;
+            return;
         }
 
         savepointInfo.setLastSavepoint(savepointFetchResult.getSavepoint());
         savepointInfo.setTriggerId(null);
-        return true;
-    }
-
-    private boolean isReadyToReconcile(FlinkDeployment flinkApp, Configuration 
effectiveConfig) {
-        JobManagerDeploymentStatus jmDeploymentStatus =
-                flinkApp.getStatus().getJobManagerDeploymentStatus();
-
-        switch (jmDeploymentStatus) {
-            case READY:
-                return observeFlinkJobStatus(flinkApp, effectiveConfig)
-                        && observeSavepointStatus(flinkApp, effectiveConfig);
-            case MISSING:
-            case ERROR:
-                return true;
-            case DEPLOYING:
-            case DEPLOYED_NOT_READY:
-                return false;
-            default:
-                throw new RuntimeException("Unknown status: " + 
jmDeploymentStatus);
-        }
     }
 
     /** Update previous job status based on the job list from the cluster. */
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
index 6f6b597..df9a8d8 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
 import 
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
@@ -36,12 +37,12 @@ import 
org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
-import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Optional;
-import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.kubernetes.operator.observer.Observer.JOB_STATE_UNKNOWN;
 
 /**
  * Reconciler responsible for handling the job lifecycle according to the 
desired and current
@@ -59,12 +60,13 @@ public class JobReconciler extends BaseReconciler {
     }
 
     @Override
-    public UpdateControl<FlinkDeployment> reconcile(
+    public void reconcile(
             String operatorNamespace,
             FlinkDeployment flinkApp,
             Context context,
             Configuration effectiveConfig)
             throws Exception {
+
         FlinkDeploymentSpec lastReconciledSpec =
                 
flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec();
         JobSpec jobSpec = flinkApp.getSpec().getJob();
@@ -76,14 +78,18 @@ public class JobReconciler extends BaseReconciler {
             IngressUtils.updateIngressRules(
                     flinkApp, effectiveConfig, operatorNamespace, 
kubernetesClient, false);
             ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp);
-            return JobManagerDeploymentStatus.DEPLOYING.toUpdateControl(
-                    flinkApp, operatorConfiguration);
+            return;
+        }
+
+        if (SavepointUtils.savepointInProgress(flinkApp)) {
+            LOG.info(
+                    "Savepoint currently in progress for {}, delaying 
reconcile...",
+                    flinkApp.getMetadata().getName());
+            return;
         }
 
-        // TODO: following assumes that current job is running
-        // What if it never enters running state due to bad deployment?
         boolean specChanged = !flinkApp.getSpec().equals(lastReconciledSpec);
-        if (specChanged) {
+        if (specChanged && readyForSpecChanges(flinkApp)) {
             JobState currentJobState = lastReconciledSpec.getJob().getState();
             JobState desiredJobState = jobSpec.getState();
 
@@ -94,7 +100,7 @@ public class JobReconciler extends BaseReconciler {
                 }
                 if (desiredJobState.equals(JobState.SUSPENDED)) {
                     printCancelLogs(upgradeMode, 
flinkApp.getMetadata().getName());
-                    cancelJob(flinkApp, upgradeMode, effectiveConfig);
+                    suspendJob(flinkApp, upgradeMode, effectiveConfig);
                 }
             }
             if (currentJobState == JobState.SUSPENDED && desiredJobState == 
JobState.RUNNING) {
@@ -115,14 +121,29 @@ public class JobReconciler extends BaseReconciler {
                 }
             }
             ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp);
-        } else if (SavepointUtils.shouldTriggerSavepoint(flinkApp)) {
+        } else if (SavepointUtils.shouldTriggerSavepoint(flinkApp) && 
isJobRunning(flinkApp)) {
             triggerSavepoint(flinkApp, effectiveConfig);
             ReconciliationUtils.updateSavepointReconciliationSuccess(flinkApp);
         }
+    }
+
+    private boolean readyForSpecChanges(FlinkDeployment deployment) {
+        if (deployment.getSpec().getJob().getUpgradeMode() != 
UpgradeMode.SAVEPOINT) {
+            // Only savepoint upgrade mode needs a running job
+            return true;
+        }
+        return deployment.getStatus().getJobManagerDeploymentStatus()
+                        == JobManagerDeploymentStatus.MISSING
+                || isJobRunning(deployment);
+    }
 
-        return UpdateControl.updateStatus(flinkApp)
-                .rescheduleAfter(
-                        operatorConfiguration.getReconcileIntervalInSec(), 
TimeUnit.SECONDS);
+    private boolean isJobRunning(FlinkDeployment deployment) {
+        FlinkDeploymentStatus status = deployment.getStatus();
+        JobManagerDeploymentStatus deploymentStatus = 
status.getJobManagerDeploymentStatus();
+        return deploymentStatus == JobManagerDeploymentStatus.READY
+                && org.apache.flink.api.common.JobStatus.RUNNING
+                        .name()
+                        .equals(status.getJobStatus().getState());
     }
 
     private void deployFlinkJob(
@@ -134,13 +155,15 @@ public class JobReconciler extends BaseReconciler {
             
effectiveConfig.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH);
         }
         flinkService.submitApplicationCluster(flinkApp, effectiveConfig);
+        flinkApp.getStatus().getJobStatus().setState(JOB_STATE_UNKNOWN);
+        
flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
     }
 
     private void upgradeFlinkJob(FlinkDeployment flinkApp, Configuration 
effectiveConfig)
             throws Exception {
         LOG.info("Upgrading running job");
         final Optional<String> savepoint =
-                cancelJob(flinkApp, 
flinkApp.getSpec().getJob().getUpgradeMode(), effectiveConfig);
+                suspendJob(flinkApp, 
flinkApp.getSpec().getJob().getUpgradeMode(), effectiveConfig);
         deployFlinkJob(flinkApp, effectiveConfig, savepoint);
     }
 
@@ -169,16 +192,24 @@ public class JobReconciler extends BaseReconciler {
         }
     }
 
-    private Optional<String> cancelJob(
+    private Optional<String> suspendJob(
             FlinkDeployment flinkApp, UpgradeMode upgradeMode, Configuration 
effectiveConfig)
             throws Exception {
-        Optional<String> savepointOpt =
-                flinkService.cancelJob(
-                        
JobID.fromHexString(flinkApp.getStatus().getJobStatus().getJobId()),
-                        upgradeMode,
-                        effectiveConfig);
+
+        Optional<String> savepointOpt = Optional.empty();
+        if (upgradeMode == UpgradeMode.STATELESS) {
+            shutdown(flinkApp, effectiveConfig);
+        } else {
+            String jobIdString = 
flinkApp.getStatus().getJobStatus().getJobId();
+            savepointOpt =
+                    flinkService.cancelJob(
+                            jobIdString != null ? 
JobID.fromHexString(jobIdString) : null,
+                            upgradeMode,
+                            effectiveConfig);
+        }
+
         JobStatus jobStatus = flinkApp.getStatus().getJobStatus();
-        jobStatus.setState("suspended");
+        jobStatus.setState(JobState.SUSPENDED.name());
         
flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
         savepointOpt.ifPresent(
                 location -> {
@@ -189,9 +220,7 @@ public class JobReconciler extends BaseReconciler {
 
     @Override
     protected void shutdown(FlinkDeployment flinkApp, Configuration 
effectiveConfig) {
-        if (org.apache.flink.api.common.JobStatus.RUNNING
-                .name()
-                
.equalsIgnoreCase(flinkApp.getStatus().getJobStatus().getState())) {
+        if (isJobRunning(flinkApp)) {
             LOG.info("Job is running, attempting graceful shutdown.");
             try {
                 flinkService.cancelJob(
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/Reconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/Reconciler.java
index f07a9d8..f42405a 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/Reconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/Reconciler.java
@@ -23,7 +23,6 @@ import 
org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
-import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
 
 /** The interface of reconciler. */
 public interface Reconciler {
@@ -35,10 +34,8 @@ public interface Reconciler {
      * @param flinkApp the FlinkDeployment resource that has been created or 
updated
      * @param context the context with which the operation is executed
      * @param effectiveConfig the effective config of the flinkApp
-     * @return UpdateControl to manage updates on the custom resource (usually 
the status) after
-     *     reconciliation.
      */
-    UpdateControl<FlinkDeployment> reconcile(
+    void reconcile(
             String operatorNamespace,
             FlinkDeployment flinkApp,
             Context context,
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index d429f66..aa58336 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -34,7 +34,8 @@ public class ReconciliationUtils {
         reconciliationStatus.setSuccess(true);
         reconciliationStatus.setError(null);
         FlinkDeploymentSpec clonedSpec = clone(flinkApp.getSpec());
-        if (reconciliationStatus.getLastReconciledSpec() != null) {
+        if (reconciliationStatus.getLastReconciledSpec() != null
+                && reconciliationStatus.getLastReconciledSpec().getJob() != 
null) {
             long oldSavepointTriggerNonce =
                     reconciliationStatus
                             .getLastReconciledSpec()
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
index a4b64ce..58bc29f 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/SessionReconciler.java
@@ -21,14 +21,12 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
+import 
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
-import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
-
-import java.util.concurrent.TimeUnit;
 
 /**
  * Reconciler responsible for handling the session cluster lifecycle according 
to the desired and
@@ -44,7 +42,7 @@ public class SessionReconciler extends BaseReconciler {
     }
 
     @Override
-    public UpdateControl<FlinkDeployment> reconcile(
+    public void reconcile(
             String operatorNamespace,
             FlinkDeployment flinkApp,
             Context context,
@@ -56,6 +54,8 @@ public class SessionReconciler extends BaseReconciler {
 
         if (lastReconciledSpec == null) {
             flinkService.submitSessionCluster(flinkApp, effectiveConfig);
+            flinkApp.getStatus()
+                    
.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
             IngressUtils.updateIngressRules(
                     flinkApp, effectiveConfig, operatorNamespace, 
kubernetesClient, false);
         }
@@ -65,15 +65,13 @@ public class SessionReconciler extends BaseReconciler {
             upgradeSessionCluster(flinkApp, effectiveConfig);
         }
         ReconciliationUtils.updateForSpecReconciliationSuccess(flinkApp);
-        return UpdateControl.updateStatus(flinkApp)
-                .rescheduleAfter(
-                        operatorConfiguration.getReconcileIntervalInSec(), 
TimeUnit.SECONDS);
     }
 
     private void upgradeSessionCluster(FlinkDeployment flinkApp, Configuration 
effectiveConfig)
             throws Exception {
         flinkService.stopSessionCluster(flinkApp, effectiveConfig, false);
         flinkService.submitSessionCluster(flinkApp, effectiveConfig);
+        
flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
     }
 
     @Override
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index 868b946..38da91f 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -177,7 +177,7 @@ public class FlinkUtils {
                                 
.withLabels(KubernetesUtils.getJobManagerSelectors(clusterId))
                                 .list();
 
-                if (jmPodList.getItems().isEmpty()) {
+                if (jmPodList == null || jmPodList.getItems().isEmpty()) {
                     jobManagerRunning = false;
                 }
             }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
index 9366472..8fe80e3 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/SavepointUtils.java
@@ -25,8 +25,13 @@ import java.util.concurrent.TimeUnit;
 
 /** Savepoint utilities. */
 public class SavepointUtils {
+
+    public static boolean savepointInProgress(FlinkDeployment flinkDeployment) 
{
+        return 
flinkDeployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId() != 
null;
+    }
+
     public static boolean shouldTriggerSavepoint(FlinkDeployment 
flinkDeployment) {
-        if 
(flinkDeployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId() 
!= null) {
+        if (savepointInProgress(flinkDeployment)) {
             return false;
         }
         return flinkDeployment.getSpec().getJob().getSavepointTriggerNonce()
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index d76b98a..3bb1251 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -17,7 +17,9 @@
 
 package org.apache.flink.kubernetes.operator;
 
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec;
@@ -25,6 +27,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.Resource;
 import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 
 import io.fabric8.kubernetes.api.model.Container;
@@ -39,7 +42,9 @@ import io.javaoperatorsdk.operator.api.reconciler.Context;
 import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 /** Testing utilities. */
@@ -72,19 +77,26 @@ public class TestUtils {
                         JobSpec.builder()
                                 .jarURI(SAMPLE_JAR)
                                 .parallelism(1)
+                                .upgradeMode(UpgradeMode.STATELESS)
                                 .state(JobState.RUNNING)
                                 .build());
         return deployment;
     }
 
     public static FlinkDeploymentSpec getTestFlinkDeploymentSpec() {
+        Map<String, String> conf = new HashMap<>();
+        conf.put(TaskManagerOptions.NUM_TASK_SLOTS.key(), "2");
+        conf.put(
+                HighAvailabilityOptions.HA_MODE.key(),
+                KubernetesHaServicesFactory.class.getCanonicalName());
+        conf.put(HighAvailabilityOptions.HA_STORAGE_PATH.key(), "test");
+
         return FlinkDeploymentSpec.builder()
                 .image(IMAGE)
                 .imagePullPolicy(IMAGE_POLICY)
                 .serviceAccount(SERVICE_ACCOUNT)
                 .flinkVersion(FLINK_VERSION)
-                .flinkConfiguration(
-                        
Collections.singletonMap(TaskManagerOptions.NUM_TASK_SLOTS.key(), "2"))
+                .flinkConfiguration(conf)
                 .jobManager(new JobManagerSpec(new Resource(1, "2048m"), 1, 
null))
                 .taskManager(new TaskManagerSpec(new Resource(1, "2048m"), 
null))
                 .build();
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 5da2ec5..69a7f4c 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -89,6 +89,11 @@ public class TestingFlinkService extends FlinkService {
     public Optional<String> cancelJob(JobID jobID, UpgradeMode upgradeMode, 
Configuration conf)
             throws Exception {
 
+        if (upgradeMode == UpgradeMode.LAST_STATE) {
+            jobs.clear();
+            return Optional.empty();
+        }
+
         if (!jobs.removeIf(js -> js.f1.getJobId().equals(jobID))) {
             throw new Exception("Job not found");
         }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 04e9925..a8ab582 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -42,7 +42,6 @@ import 
io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
 import io.javaoperatorsdk.operator.processing.event.source.EventSource;
-import okhttp3.mockwebserver.MockWebServer;
 import okhttp3.mockwebserver.RecordedRequest;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -50,14 +49,13 @@ import org.junit.jupiter.api.Test;
 
 import java.net.HttpURLConnection;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -78,7 +76,7 @@ public class FlinkDeploymentControllerTest {
     @BeforeEach
     public void setup() {
         flinkService = new TestingFlinkService();
-        mockServer = new KubernetesMockServer(new MockWebServer(), new 
HashMap<>(), true);
+        mockServer = new KubernetesMockServer();
         mockServer.init();
         kubernetesClient = mockServer.createClient();
         testController = createTestController(kubernetesClient, flinkService);
@@ -156,7 +154,6 @@ public class FlinkDeploymentControllerTest {
 
     @Test
     public void verifyFailedDeployment() throws Exception {
-
         mockServer
                 .expect()
                 .post()
@@ -169,11 +166,15 @@ public class FlinkDeploymentControllerTest {
         FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
         UpdateControl<FlinkDeployment> updateControl;
 
-        updateControl =
-                testController.reconcile(
-                        appCluster, 
TestUtils.createContextWithFailedJobManagerDeployment());
+        updateControl = testController.reconcile(appCluster, 
TestUtils.createEmptyContext());
+        testController.reconcile(
+                appCluster, 
TestUtils.createContextWithFailedJobManagerDeployment());
         assertTrue(updateControl.isUpdateStatus());
-        assertEquals(Optional.empty(), updateControl.getScheduleDelay());
+        assertEquals(
+                JobManagerDeploymentStatus.ERROR
+                        .toUpdateControl(appCluster, operatorConfiguration)
+                        .getScheduleDelay(),
+                updateControl.getScheduleDelay());
 
         RecordedRequest recordedRequest = mockServer.getLastRequest();
         assertEquals("POST", recordedRequest.getMethod());
@@ -265,6 +266,9 @@ public class FlinkDeploymentControllerTest {
         assertEquals(1, jobs.size());
         assertEquals("s0", jobs.get(0).f0);
 
+        testController.reconcile(appCluster, context);
+        testController.reconcile(appCluster, context);
+
         // Upgrade job
         appCluster = ReconciliationUtils.clone(appCluster);
         appCluster.getSpec().getJob().setParallelism(100);
@@ -272,7 +276,7 @@ public class FlinkDeploymentControllerTest {
         UpdateControl<FlinkDeployment> updateControl =
                 testController.reconcile(appCluster, context);
         assertEquals(
-                JobManagerDeploymentStatus.DEPLOYED_NOT_READY
+                JobManagerDeploymentStatus.DEPLOYING
                         .toUpdateControl(appCluster, operatorConfiguration)
                         .getScheduleDelay(),
                 updateControl.getScheduleDelay());
@@ -281,6 +285,9 @@ public class FlinkDeploymentControllerTest {
         assertEquals(1, jobs.size());
         assertEquals(null, jobs.get(0).f0);
 
+        testController.reconcile(appCluster, context);
+        testController.reconcile(appCluster, context);
+
         // Suspend job
         appCluster = ReconciliationUtils.clone(appCluster);
         appCluster.getSpec().getJob().setState(JobState.SUSPENDED);
@@ -296,6 +303,113 @@ public class FlinkDeploymentControllerTest {
     }
 
     @Test
+    public void testUpgradeNotReadyCluster() {
+        testUpgradeNotReadyCluster(TestUtils.buildSessionCluster(), true);
+
+        FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
+        appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
+        testUpgradeNotReadyCluster(appCluster, true);
+
+        appCluster = TestUtils.buildApplicationCluster();
+        appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+        testUpgradeNotReadyCluster(appCluster, true);
+
+        appCluster = TestUtils.buildApplicationCluster();
+        appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
+        testUpgradeNotReadyCluster(appCluster, false);
+    }
+
+    public void testUpgradeNotReadyCluster(FlinkDeployment appCluster, boolean 
allowUpgrade) {
+        mockServer
+                .expect()
+                .delete()
+                
.withPath("/apis/apps/v1/namespaces/flink-operator-test/deployments/test-cluster")
+                .andReturn(
+                        HttpURLConnection.HTTP_CREATED,
+                        new 
EventBuilder().withNewMetadata().endMetadata().build())
+                .always();
+
+        mockServer
+                .expect()
+                .get()
+                .withPath(
+                        
"/api/v1/namespaces/flink-operator-test/pods?labelSelector=app%3Dtest-cluster%2Ccomponent%3Djobmanager%2Ctype%3Dflink-native-kubernetes")
+                .andReturn(
+                        HttpURLConnection.HTTP_CREATED,
+                        new 
EventBuilder().withNewMetadata().endMetadata().build())
+                .always();
+
+        testController.reconcile(appCluster, TestUtils.createEmptyContext());
+        assertEquals(
+                appCluster.getSpec(),
+                
appCluster.getStatus().getReconciliationStatus().getLastReconciledSpec());
+
+        flinkService.setPortReady(false);
+        testController.reconcile(appCluster, context);
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYING,
+                appCluster.getStatus().getJobManagerDeploymentStatus());
+
+        // trigger change
+        
appCluster.getSpec().setServiceAccount(appCluster.getSpec().getServiceAccount() 
+ "-2");
+
+        // Verify that even in DEPLOYING state we still redeploy
+        testController.reconcile(appCluster, context);
+        testController.reconcile(appCluster, context);
+        if (allowUpgrade) {
+            assertEquals(
+                    JobManagerDeploymentStatus.DEPLOYING,
+                    appCluster.getStatus().getJobManagerDeploymentStatus());
+            assertEquals(
+                    appCluster.getSpec(),
+                    
appCluster.getStatus().getReconciliationStatus().getLastReconciledSpec());
+
+            flinkService.setPortReady(true);
+            testController.reconcile(appCluster, context);
+            testController.reconcile(appCluster, context);
+
+            if (appCluster.getSpec().getJob() != null) {
+                assertEquals("RUNNING", 
appCluster.getStatus().getJobStatus().getState());
+            } else {
+                assertNull(appCluster.getStatus().getJobStatus().getState());
+            }
+            assertEquals(
+                    JobManagerDeploymentStatus.READY,
+                    appCluster.getStatus().getJobManagerDeploymentStatus());
+        } else {
+            assertEquals(
+                    JobManagerDeploymentStatus.DEPLOYING,
+                    appCluster.getStatus().getJobManagerDeploymentStatus());
+            assertNotEquals(
+                    appCluster.getSpec(),
+                    
appCluster.getStatus().getReconciliationStatus().getLastReconciledSpec());
+
+            flinkService.setPortReady(true);
+            testController.reconcile(appCluster, context);
+            testController.reconcile(appCluster, context);
+
+            assertEquals(
+                    JobManagerDeploymentStatus.DEPLOYING,
+                    appCluster.getStatus().getJobManagerDeploymentStatus());
+            assertEquals(
+                    appCluster.getSpec(),
+                    
appCluster.getStatus().getReconciliationStatus().getLastReconciledSpec());
+
+            testController.reconcile(appCluster, context);
+            assertEquals(
+                    JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
+                    appCluster.getStatus().getJobManagerDeploymentStatus());
+
+            testController.reconcile(appCluster, context);
+
+            assertEquals("RUNNING", 
appCluster.getStatus().getJobStatus().getState());
+            assertEquals(
+                    JobManagerDeploymentStatus.READY,
+                    appCluster.getStatus().getJobManagerDeploymentStatus());
+        }
+    }
+
+    @Test
     public void testPrepareEventSource() {
         // Test watch all
         testController.setControllerConfig(
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
index 0b4289f..4a3a425 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/ObserverTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
@@ -32,9 +33,7 @@ import io.javaoperatorsdk.operator.api.reconciler.Context;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** @link Observer unit tests */
 public class ObserverTest {
@@ -54,21 +53,19 @@ public class ObserverTest {
                 .getReconciliationStatus()
                 .setLastReconciledSpec(deployment.getSpec());
 
-        assertFalse(
-                observer.observe(
-                        deployment,
-                        readyContext,
-                        FlinkUtils.getEffectiveConfig(deployment, new 
Configuration())));
+        observer.observe(
+                deployment,
+                readyContext,
+                FlinkUtils.getEffectiveConfig(deployment, new 
Configuration()));
 
         assertEquals(
                 JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
                 deployment.getStatus().getJobManagerDeploymentStatus());
 
-        assertTrue(
-                observer.observe(
-                        deployment,
-                        readyContext,
-                        FlinkUtils.getEffectiveConfig(deployment, new 
Configuration())));
+        observer.observe(
+                deployment,
+                readyContext,
+                FlinkUtils.getEffectiveConfig(deployment, new 
Configuration()));
 
         assertEquals(
                 JobManagerDeploymentStatus.READY,
@@ -85,7 +82,7 @@ public class ObserverTest {
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
         Configuration conf = FlinkUtils.getEffectiveConfig(deployment, new 
Configuration());
 
-        assertTrue(observer.observe(deployment, 
TestUtils.createEmptyContext(), conf));
+        observer.observe(deployment, TestUtils.createEmptyContext(), conf);
 
         deployment.setStatus(new FlinkDeploymentStatus());
         deployment
@@ -99,33 +96,35 @@ public class ObserverTest {
         flinkService.setPortReady(false);
 
         // Port not ready
-        assertFalse(observer.observe(deployment, readyContext, conf));
+        observer.observe(deployment, readyContext, conf);
         assertEquals(
                 JobManagerDeploymentStatus.DEPLOYING,
                 deployment.getStatus().getJobManagerDeploymentStatus());
 
-        assertFalse(observer.observe(deployment, readyContext, conf));
+        observer.observe(deployment, readyContext, conf);
         assertEquals(
                 JobManagerDeploymentStatus.DEPLOYING,
                 deployment.getStatus().getJobManagerDeploymentStatus());
 
         flinkService.setPortReady(true);
         // Port ready but we have to recheck once again
-        assertFalse(observer.observe(deployment, readyContext, conf));
+        observer.observe(deployment, readyContext, conf);
         assertEquals(
                 JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
                 deployment.getStatus().getJobManagerDeploymentStatus());
 
         // Stable ready
-        assertTrue(observer.observe(deployment, readyContext, conf));
+        observer.observe(deployment, readyContext, conf);
         assertEquals(
                 JobManagerDeploymentStatus.READY,
                 deployment.getStatus().getJobManagerDeploymentStatus());
+        assertEquals(JobState.RUNNING.name(), 
deployment.getStatus().getJobStatus().getState());
 
-        assertTrue(observer.observe(deployment, readyContext, conf));
+        observer.observe(deployment, readyContext, conf);
         assertEquals(
                 JobManagerDeploymentStatus.READY,
                 deployment.getStatus().getJobManagerDeploymentStatus());
+        assertEquals(JobState.RUNNING.name(), 
deployment.getStatus().getJobStatus().getState());
 
         assertEquals(
                 deployment.getMetadata().getName(),
@@ -133,11 +132,11 @@ public class ObserverTest {
 
         // Test listing failure
         flinkService.clear();
-        assertFalse(observer.observe(deployment, readyContext, conf));
+        observer.observe(deployment, readyContext, conf);
         assertEquals(
                 JobManagerDeploymentStatus.READY,
                 deployment.getStatus().getJobManagerDeploymentStatus());
-        assertEquals("UNKNOWN", 
deployment.getStatus().getJobStatus().getState());
+        assertEquals(Observer.JOB_STATE_UNKNOWN, 
deployment.getStatus().getJobStatus().getState());
     }
 
     @Test
@@ -151,7 +150,7 @@ public class ObserverTest {
         Configuration conf = FlinkUtils.getEffectiveConfig(deployment, new 
Configuration());
         flinkService.submitApplicationCluster(deployment, conf);
         bringToReadyStatus(deployment);
-        assertTrue(observer.observe(deployment, readyContext, conf));
+        observer.observe(deployment, readyContext, conf);
         assertEquals(
                 JobManagerDeploymentStatus.READY,
                 deployment.getStatus().getJobManagerDeploymentStatus());
@@ -160,7 +159,7 @@ public class ObserverTest {
         assertEquals(
                 "trigger_0",
                 
deployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
-        assertTrue(observer.observe(deployment, readyContext, conf));
+        observer.observe(deployment, readyContext, conf);
         assertEquals(
                 "savepoint_0",
                 deployment
@@ -175,7 +174,7 @@ public class ObserverTest {
         assertEquals(
                 "trigger_1",
                 
deployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
-        assertTrue(observer.observe(deployment, readyContext, conf));
+        observer.observe(deployment, readyContext, conf);
         assertEquals(
                 "savepoint_1",
                 deployment
@@ -195,7 +194,7 @@ public class ObserverTest {
         JobStatus jobStatus = new JobStatus();
         jobStatus.setJobName("jobname");
         jobStatus.setJobId("0000000000");
-        jobStatus.setState("RUNNING");
+        jobStatus.setState(JobState.RUNNING.name());
         deployment.getStatus().setJobStatus(jobStatus);
         
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
     }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
index c712501..40aaf3b 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import 
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
@@ -176,5 +177,6 @@ public class JobReconcilerTest {
         jobStatus.setState("RUNNING");
 
         deployment.getStatus().setJobStatus(jobStatus);
+        
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
     }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
index bfeefb6..31d7d25 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
@@ -33,6 +33,7 @@ import org.junit.Assert;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 import java.util.function.Consumer;
@@ -62,7 +63,10 @@ public class DeploymentValidatorTest {
                 dep -> dep.getSpec().getJob().setParallelism(-1),
                 "Job parallelism must be larger than 0");
         testError(
-                dep -> 
dep.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE),
+                dep -> {
+                    dep.getSpec().setFlinkConfiguration(new HashMap<>());
+                    
dep.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+                },
                 "Job could not be upgraded with last-state while HA disabled");
 
         // Test conf validation
@@ -93,7 +97,10 @@ public class DeploymentValidatorTest {
                 "Invalid log config key");
 
         testError(
-                dep -> dep.getSpec().getJobManager().setReplicas(2),
+                dep -> {
+                    dep.getSpec().setFlinkConfiguration(new HashMap<>());
+                    dep.getSpec().getJobManager().setReplicas(2);
+                },
                 "High availability should be enabled when starting standby 
JobManagers.");
         testError(
                 dep -> dep.getSpec().getJobManager().setReplicas(0),

Reply via email to