This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new ab59d6e  [FLINK-27820] Handle deployment errors on observe
ab59d6e is described below

commit ab59d6eb980512775590d0d01e697fe0c28d1b3b
Author: Gyula Fora <[email protected]>
AuthorDate: Mon Jun 20 15:22:26 2022 +0200

    [FLINK-27820] Handle deployment errors on observe
---
 .../flink/kubernetes/operator/FlinkOperator.java   |   1 -
 .../operator/config/FlinkConfigManager.java        |   5 +-
 .../controller/FlinkDeploymentController.java      |   8 --
 .../kubernetes/operator/crd/FlinkDeployment.java   |   4 +-
 .../operator/crd/status/ReconciliationStatus.java  |  26 +++++-
 .../deployment/AbstractDeploymentObserver.java     |  61 +++++++++++-
 .../operator/reconciler/ReconciliationUtils.java   |  71 +++++++++-----
 .../kubernetes/operator/utils/FlinkUtils.java      |  16 ++++
 .../flink/kubernetes/operator/TestUtils.java       |  74 ++++-----------
 .../operator/config/FlinkConfigManagerTest.java    |  14 ++-
 .../operator/observer/SavepointObserverTest.java   |   2 +-
 .../deployment/ApplicationObserverTest.java        | 103 ++++++++++++++++-----
 .../observer/deployment/SessionObserverTest.java   |   8 +-
 .../operator/utils/ReconciliationUtilsTest.java    |  34 +++++--
 .../operator/utils/SavepointUtilsTest.java         |   4 +-
 .../operator/utils/StatusRecorderTest.java         |   2 +-
 .../operator/validation/DefaultValidatorTest.java  |   8 +-
 17 files changed, 297 insertions(+), 144 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index d5313d4..cfbb450 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -122,7 +122,6 @@ public class FlinkOperator {
         var controller =
                 new FlinkDeploymentController(
                         configManager,
-                        client,
                         validators,
                         reconcilerFactory,
                         observerFactory,
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
index c8e4bc3..8f67978 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 
 import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
 import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
@@ -144,7 +145,9 @@ public class FlinkConfigManager {
     }
 
     public Configuration getDeployConfig(ObjectMeta objectMeta, 
FlinkDeploymentSpec spec) {
-        return getConfig(objectMeta, spec);
+        var conf = getConfig(objectMeta, spec);
+        FlinkUtils.setGenerationAnnotation(conf, objectMeta.getGeneration());
+        return conf;
     }
 
     public Configuration getObserveConfig(FlinkDeployment deployment) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
index df69871..346bbe3 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java
@@ -18,7 +18,6 @@
 package org.apache.flink.kubernetes.operator.controller;
 
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
@@ -35,7 +34,6 @@ import org.apache.flink.kubernetes.operator.utils.EventUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
 
-import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
 import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
@@ -53,7 +51,6 @@ import org.slf4j.LoggerFactory;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 
 /** Controller that runs the main reconcile loop for Flink deployments. */
 @ControllerConfiguration()
@@ -65,7 +62,6 @@ public class FlinkDeploymentController
     private static final Logger LOG = 
LoggerFactory.getLogger(FlinkDeploymentController.class);
 
     private final FlinkConfigManager configManager;
-    private final KubernetesClient kubernetesClient;
 
     private final Set<FlinkResourceValidator> validators;
     private final ReconcilerFactory reconcilerFactory;
@@ -73,12 +69,9 @@ public class FlinkDeploymentController
     private final MetricManager<FlinkDeployment> metricManager;
     private final StatusRecorder<FlinkDeploymentStatus> statusRecorder;
     private final EventRecorder eventRecorder;
-    private final ConcurrentHashMap<Tuple2<String, String>, 
FlinkDeploymentStatus> statusCache =
-            new ConcurrentHashMap<>();
 
     public FlinkDeploymentController(
             FlinkConfigManager configManager,
-            KubernetesClient kubernetesClient,
             Set<FlinkResourceValidator> validators,
             ReconcilerFactory reconcilerFactory,
             ObserverFactory observerFactory,
@@ -86,7 +79,6 @@ public class FlinkDeploymentController
             StatusRecorder<FlinkDeploymentStatus> statusRecorder,
             EventRecorder eventRecorder) {
         this.configManager = configManager;
-        this.kubernetesClient = kubernetesClient;
         this.validators = validators;
         this.reconcilerFactory = reconcilerFactory;
         this.observerFactory = observerFactory;
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java
index 187319d..04ed569 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/FlinkDeployment.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator.crd;
 
 import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 
@@ -39,8 +40,9 @@ public class FlinkDeployment
         extends AbstractFlinkResource<FlinkDeploymentSpec, 
FlinkDeploymentStatus>
         implements Namespaced {
 
+    @VisibleForTesting
     @Override
-    protected FlinkDeploymentStatus initStatus() {
+    public FlinkDeploymentStatus initStatus() {
         return new FlinkDeploymentStatus();
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java
index 0bfe6a4..1d06e90 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/status/ReconciliationStatus.java
@@ -18,10 +18,13 @@
 package org.apache.flink.kubernetes.operator.crd.status;
 
 import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.databind.node.ObjectNode;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
@@ -47,24 +50,37 @@ public abstract class ReconciliationStatus<SPEC extends 
AbstractFlinkSpec> {
     private String lastStableSpec;
 
     /** Deployment state of the last reconciled spec. */
-    private ReconciliationState state = ReconciliationState.DEPLOYED;
+    private ReconciliationState state = ReconciliationState.UPGRADING;
 
     @JsonIgnore
     public abstract Class<SPEC> getSpecClass();
 
     @JsonIgnore
     public SPEC deserializeLastReconciledSpec() {
-        return 
ReconciliationUtils.deserializedSpecWithVersion(lastReconciledSpec, 
getSpecClass());
+        var specWithMeta = deserializeLastReconciledSpecWithMeta();
+        return specWithMeta != null ? specWithMeta.f0 : null;
     }
 
     @JsonIgnore
     public SPEC deserializeLastStableSpec() {
-        return ReconciliationUtils.deserializedSpecWithVersion(lastStableSpec, 
getSpecClass());
+        var specWithMeta = deserializeLastStableSpecWithMeta();
+        return specWithMeta != null ? specWithMeta.f0 : null;
     }
 
     @JsonIgnore
-    public void serializeAndSetLastReconciledSpec(SPEC spec) {
-        
setLastReconciledSpec(ReconciliationUtils.writeSpecWithCurrentVersion(spec));
+    public Tuple2<SPEC, ObjectNode> deserializeLastReconciledSpecWithMeta() {
+        return ReconciliationUtils.deserializeSpecWithMeta(lastReconciledSpec, 
getSpecClass());
+    }
+
+    @JsonIgnore
+    public Tuple2<SPEC, ObjectNode> deserializeLastStableSpecWithMeta() {
+        return ReconciliationUtils.deserializeSpecWithMeta(lastStableSpec, 
getSpecClass());
+    }
+
+    @JsonIgnore
+    public void serializeAndSetLastReconciledSpec(
+            SPEC spec, AbstractFlinkResource<SPEC, ?> resource) {
+        setLastReconciledSpec(ReconciliationUtils.writeSpecWithMeta(spec, 
resource));
     }
 
     public void markReconciledSpecAsStable() {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
index d3217f3..4a2d08c 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.EventUtils;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
 
 import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
@@ -75,15 +76,19 @@ public abstract class AbstractDeploymentObserver implements 
Observer<FlinkDeploy
     public void observe(FlinkDeployment flinkApp, Context context) {
         var status = flinkApp.getStatus();
         var reconciliationStatus = status.getReconciliationStatus();
-        var lastReconciledSpec = 
reconciliationStatus.deserializeLastReconciledSpec();
 
         // Nothing has been launched so skip observing
-        if (lastReconciledSpec == null
-                || reconciliationStatus.getState() == 
ReconciliationState.ROLLING_BACK
-                || reconciliationStatus.getState() == 
ReconciliationState.UPGRADING) {
+        if (reconciliationStatus.getState() == 
ReconciliationState.ROLLING_BACK) {
             return;
         }
 
+        if (reconciliationStatus.getState() == ReconciliationState.UPGRADING) {
+            checkIfAlreadyUpgraded(flinkApp, context);
+            if (reconciliationStatus.getState() == 
ReconciliationState.UPGRADING) {
+                return;
+            }
+        }
+
         Configuration observeConfig = configManager.getObserveConfig(flinkApp);
         if (!isJmDeploymentReady(flinkApp)) {
             observeJmDeployment(flinkApp, context, observeConfig);
@@ -252,6 +257,54 @@ public abstract class AbstractDeploymentObserver 
implements Observer<FlinkDeploy
                 EventUtils.Component.JobManagerDeployment);
     }
 
+    /**
+     * Checks a deployment that is currently in the UPGRADING state whether it 
was already deployed
+     * but we simply miss the status information. After comparing the target 
resource generation
+     * with the one from the possible deployment if they match we update the 
status to the already
+     * DEPLOYED state.
+     *
+     * @param flinkDep Flink resource to check.
+     * @param context Context for reconciliation.
+     */
+    private void checkIfAlreadyUpgraded(FlinkDeployment flinkDep, Context 
context) {
+        Optional<Deployment> depOpt = 
context.getSecondaryResource(Deployment.class);
+        var status = flinkDep.getStatus();
+        depOpt.ifPresent(
+                deployment -> {
+                    Map<String, String> annotations = 
deployment.getMetadata().getAnnotations();
+                    if (annotations == null) {
+                        return;
+                    }
+                    Long deployedGeneration =
+                            
Optional.ofNullable(annotations.get(FlinkUtils.CR_GENERATION_LABEL))
+                                    .map(Long::valueOf)
+                                    .orElse(-1L);
+
+                    Long upgradeTargetGeneration =
+                            
ReconciliationUtils.getUpgradeTargetGeneration(flinkDep);
+
+                    if (deployedGeneration.equals(upgradeTargetGeneration)) {
+                        logger.info(
+                                "Last reconciled generation is already 
deployed, setting reconciliation status to "
+                                        + ReconciliationState.DEPLOYED);
+
+                        var firstDeploy =
+                                
status.getReconciliationStatus().getLastReconciledSpec() == null;
+                        var conf =
+                                firstDeploy
+                                        ? configManager.getDeployConfig(
+                                                flinkDep.getMetadata(), 
flinkDep.getSpec())
+                                        : 
configManager.getObserveConfig(flinkDep);
+
+                        ReconciliationUtils.updateForSpecReconciliationSuccess(
+                                flinkDep, JobState.RUNNING, conf);
+                        status.getJobStatus()
+                                
.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
+                        
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
+                    }
+                });
+    }
+
     /**
      * Observe the flinkApp status when the cluster is ready. It will be 
implemented by child class
      * to reflect the changed status on the flinkApp resource.
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 85bdf52..ff59b7d 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -17,12 +17,12 @@
 
 package org.apache.flink.kubernetes.operator.reconciler;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
-import org.apache.flink.kubernetes.operator.crd.CrdConstants;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
@@ -46,7 +46,6 @@ import org.apache.flink.util.Preconditions;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.fasterxml.jackson.databind.node.TextNode;
 import io.fabric8.kubernetes.client.CustomResource;
 import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
 import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
@@ -65,6 +64,8 @@ public class ReconciliationUtils {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(ReconciliationUtils.class);
 
+    public static final String INTERNAL_METADATA_JSON_KEY = 
"resource_metadata";
+
     private static final ObjectMapper objectMapper = new ObjectMapper();
 
     public static <SPEC extends AbstractFlinkSpec> void 
updateForSpecReconciliationSuccess(
@@ -97,7 +98,7 @@ public class ReconciliationUtils {
             }
             clonedSpec.getJob().setState(stateAfterReconcile);
         }
-        reconciliationStatus.serializeAndSetLastReconciledSpec(clonedSpec);
+        reconciliationStatus.serializeAndSetLastReconciledSpec(clonedSpec, 
target);
         
reconciliationStatus.setReconciliationTimestamp(System.currentTimeMillis());
         reconciliationStatus.setState(
                 upgrading ? ReconciliationState.UPGRADING : 
ReconciliationState.DEPLOYED);
@@ -120,10 +121,12 @@ public class ReconciliationUtils {
         var spec = target.getSpec();
         var reconciliationStatus = commonStatus.getReconciliationStatus();
         var lastReconciledSpec = 
reconciliationStatus.deserializeLastReconciledSpec();
+
         lastReconciledSpec
                 .getJob()
                 
.setSavepointTriggerNonce(spec.getJob().getSavepointTriggerNonce());
-        
reconciliationStatus.serializeAndSetLastReconciledSpec(lastReconciledSpec);
+
+        
reconciliationStatus.serializeAndSetLastReconciledSpec(lastReconciledSpec, 
target);
         
reconciliationStatus.setReconciliationTimestamp(System.currentTimeMillis());
     }
 
@@ -209,12 +212,7 @@ public class ReconciliationUtils {
         var reconciliationStatus = 
deployment.getStatus().getReconciliationStatus();
         var reconciliationState = reconciliationStatus.getState();
         if (reconciliationState != ReconciliationState.ROLLED_BACK) {
-            var lastReconciledSpec = 
reconciliationStatus.deserializeLastReconciledSpec();
-            if (lastReconciledSpec == null) {
-                return null;
-            } else {
-                return lastReconciledSpec;
-            }
+            return reconciliationStatus.deserializeLastReconciledSpec();
         } else {
             return reconciliationStatus.deserializeLastStableSpec();
         }
@@ -229,26 +227,41 @@ public class ReconciliationUtils {
                 || currentReconState == ReconciliationState.UPGRADING;
     }
 
-    public static <T> T deserializedSpecWithVersion(
-            @Nullable String specString, Class<T> specClass) {
-        if (specString == null) {
+    public static <T> Tuple2<T, ObjectNode> deserializeSpecWithMeta(
+            @Nullable String specWithMetaString, Class<T> specClass) {
+        if (specWithMetaString == null) {
             return null;
         }
 
         try {
-            ObjectNode objectNode = (ObjectNode) 
objectMapper.readTree(specString);
-            objectNode.remove("apiVersion");
-            return objectMapper.treeToValue(objectNode, specClass);
+            ObjectNode wrapper = (ObjectNode) 
objectMapper.readTree(specWithMetaString);
+            ObjectNode internalMeta = (ObjectNode) 
wrapper.remove(INTERNAL_METADATA_JSON_KEY);
+
+            if (internalMeta == null) {
+                // migrating from old format
+                wrapper.remove("apiVersion");
+                return Tuple2.of(objectMapper.treeToValue(wrapper, specClass), 
internalMeta);
+            } else {
+                return Tuple2.of(
+                        objectMapper.treeToValue(wrapper.get("spec"), 
specClass), internalMeta);
+            }
         } catch (JsonProcessingException e) {
             throw new RuntimeException("Could not deserialize spec, this 
indicates a bug...", e);
         }
     }
 
-    public static String writeSpecWithCurrentVersion(Object spec) {
-        ObjectNode objectNode = 
objectMapper.valueToTree(Preconditions.checkNotNull(spec));
-        objectNode.set("apiVersion", new TextNode(CrdConstants.API_VERSION));
+    public static String writeSpecWithMeta(
+            Object spec, AbstractFlinkResource<?, ?> relatedResource) {
+        ObjectNode wrapper = objectMapper.createObjectNode();
+        wrapper.set("spec", 
objectMapper.valueToTree(Preconditions.checkNotNull(spec)));
+
+        ObjectNode internalMeta = 
wrapper.putObject(INTERNAL_METADATA_JSON_KEY);
+        internalMeta.put("apiVersion", relatedResource.getApiVersion());
+        ObjectNode metadata = internalMeta.putObject("metadata");
+        metadata.put("generation", 
relatedResource.getMetadata().getGeneration());
+
         try {
-            return objectMapper.writeValueAsString(objectNode);
+            return objectMapper.writeValueAsString(wrapper);
         } catch (JsonProcessingException e) {
             throw new RuntimeException("Could not serialize spec, this 
indicates a bug...", e);
         }
@@ -269,7 +282,7 @@ public class ReconciliationUtils {
             return false;
         }
 
-        FlinkDeploymentSpec lastStableSpec = 
reconciliationStatus.deserializeLastStableSpec();
+        var lastStableSpec = reconciliationStatus.deserializeLastStableSpec();
         if (lastStableSpec != null
                 && lastStableSpec.getJob() != null
                 && lastStableSpec.getJob().getState() == JobState.SUSPENDED) {
@@ -401,4 +414,20 @@ public class ReconciliationUtils {
         // Status was updated already, no need to return anything
         return ErrorStatusUpdateControl.noStatusUpdate();
     }
+
+    public static Long getUpgradeTargetGeneration(FlinkDeployment deployment) {
+        var lastSpecWithMeta =
+                deployment
+                        .getStatus()
+                        .getReconciliationStatus()
+                        .deserializeLastReconciledSpecWithMeta();
+
+        if (lastSpecWithMeta == null || lastSpecWithMeta.f1 == null) {
+            // For first deployments and when migrating from before this 
feature simply return
+            // current generation
+            return deployment.getMetadata().getGeneration();
+        }
+
+        return 
lastSpecWithMeta.f1.get("metadata").get("generation").asLong(-1L);
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index f672d65..5d0cc6b 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -46,6 +46,8 @@ import io.fabric8.kubernetes.client.KubernetesClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.function.Predicate;
@@ -58,6 +60,8 @@ public class FlinkUtils {
     private static final Logger LOG = 
LoggerFactory.getLogger(FlinkUtils.class);
     private static final ObjectMapper MAPPER = new ObjectMapper();
 
+    public static final String CR_GENERATION_LABEL = 
"flinkdeployment.flink.apache.org/generation";
+
     public static Pod mergePodTemplates(Pod toPod, Pod fromPod) {
         if (fromPod == null) {
             return toPod;
@@ -286,4 +290,16 @@ public class FlinkUtils {
         int taskSlots = conf.get(TaskManagerOptions.NUM_TASK_SLOTS);
         return (parallelism + taskSlots - 1) / taskSlots;
     }
+
+    public static void setGenerationAnnotation(Configuration conf, Long 
generation) {
+        if (generation == null) {
+            return;
+        }
+        var labels =
+                new HashMap<>(
+                        
conf.getOptional(KubernetesConfigOptions.JOB_MANAGER_ANNOTATIONS)
+                                .orElse(Collections.emptyMap()));
+        labels.put(CR_GENERATION_LABEL, generation.toString());
+        conf.set(KubernetesConfigOptions.JOB_MANAGER_ANNOTATIONS, labels);
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index c619633..4f429c3 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -49,6 +49,7 @@ import org.apache.flink.metrics.testutils.MetricListener;
 import io.fabric8.kubernetes.api.model.Container;
 import io.fabric8.kubernetes.api.model.ContainerStatus;
 import io.fabric8.kubernetes.api.model.ContainerStatusBuilder;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
 import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.PodList;
@@ -69,6 +70,8 @@ import okhttp3.Headers;
 import okhttp3.mockwebserver.RecordedRequest;
 import org.junit.jupiter.api.Assertions;
 
+import javax.annotation.Nullable;
+
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -131,6 +134,7 @@ public class TestUtils {
                                 .upgradeMode(UpgradeMode.STATELESS)
                                 .state(JobState.RUNNING)
                                 .build());
+        deployment.setStatus(deployment.initStatus());
         return deployment;
     }
 
@@ -211,35 +215,6 @@ public class TestUtils {
         return new PodListBuilder().withItems(pod).build();
     }
 
-    public static Context createEmptyContext() {
-        return new Context() {
-            @Override
-            public Optional<RetryInfo> getRetryInfo() {
-                return Optional.empty();
-            }
-
-            @Override
-            public Optional getSecondaryResource(Class aClass, String s) {
-                return Optional.empty();
-            }
-
-            @Override
-            public Set getSecondaryResources(Class expectedType) {
-                return null;
-            }
-
-            @Override
-            public ControllerConfiguration getControllerConfiguration() {
-                return null;
-            }
-
-            @Override
-            public ManagedDependentResourceContext 
managedDependentResourceContext() {
-                return null;
-            }
-        };
-    }
-
     public static Deployment createDeployment(boolean ready) {
         DeploymentStatus status = new DeploymentStatus();
         status.setAvailableReplicas(ready ? 1 : 0);
@@ -247,12 +222,13 @@ public class TestUtils {
         DeploymentSpec spec = new DeploymentSpec();
         spec.setReplicas(1);
         Deployment deployment = new Deployment();
+        deployment.setMetadata(new ObjectMeta());
         deployment.setSpec(spec);
         deployment.setStatus(status);
         return deployment;
     }
 
-    public static Context createContextWithReadyJobManagerDeployment() {
+    public static Context createContextWithDeployment(@Nullable Deployment 
deployment) {
         return new Context() {
             @Override
             public Optional<RetryInfo> getRetryInfo() {
@@ -261,7 +237,7 @@ public class TestUtils {
 
             @Override
             public Optional getSecondaryResource(Class expectedType, String 
eventSourceName) {
-                return Optional.of(createDeployment(true));
+                return Optional.ofNullable(deployment);
             }
 
             @Override
@@ -281,33 +257,16 @@ public class TestUtils {
         };
     }
 
-    public static Context createContextWithInProgressDeployment() {
-        return new Context() {
-            @Override
-            public Optional<RetryInfo> getRetryInfo() {
-                return Optional.empty();
-            }
-
-            @Override
-            public Optional getSecondaryResource(Class expectedType, String 
eventSourceName) {
-                return Optional.of(createDeployment(false));
-            }
-
-            @Override
-            public Set getSecondaryResources(Class expectedType) {
-                return null;
-            }
+    public static Context createEmptyContext() {
+        return createContextWithDeployment(null);
+    }
 
-            @Override
-            public ControllerConfiguration getControllerConfiguration() {
-                return null;
-            }
+    public static Context createContextWithReadyJobManagerDeployment() {
+        return createContextWithDeployment(createDeployment(true));
+    }
 
-            @Override
-            public ManagedDependentResourceContext 
managedDependentResourceContext() {
-                return null;
-            }
-        };
+    public static Context createContextWithInProgressDeployment() {
+        return createContextWithDeployment(createDeployment(false));
     }
 
     public static Context createContextWithReadyFlinkDeployment() {
@@ -329,7 +288,7 @@ public class TestUtils {
                 
session.getSpec().getFlinkConfiguration().putAll(flinkDepConfig);
                 session.getStatus()
                         .getReconciliationStatus()
-                        .serializeAndSetLastReconciledSpec(session.getSpec());
+                        .serializeAndSetLastReconciledSpec(session.getSpec(), 
session);
                 return Optional.of(session);
             }
 
@@ -482,7 +441,6 @@ public class TestUtils {
         var eventRecorder = new EventRecorder(kubernetesClient, (r, e) -> {});
         return new FlinkDeploymentController(
                 configManager,
-                kubernetesClient,
                 ValidatorUtils.discoverValidators(configManager),
                 new ReconcilerFactory(kubernetesClient, flinkService, 
configManager, eventRecorder),
                 new ObserverFactory(flinkService, configManager, 
statusRecorder, eventRecorder),
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
index 5be498b..6321c92 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManagerTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.utils.Constants;
 
 import io.fabric8.kubernetes.api.model.Pod;
@@ -61,7 +62,7 @@ public class FlinkConfigManagerTest {
                 deployment.getStatus().getReconciliationStatus();
 
         deployment.getSpec().getFlinkConfiguration().put(testConf.key(), 
"reconciled");
-        
reconciliationStatus.serializeAndSetLastReconciledSpec(deployment.getSpec());
+        
reconciliationStatus.serializeAndSetLastReconciledSpec(deployment.getSpec(), 
deployment);
         reconciliationStatus.markReconciledSpecAsStable();
 
         deployment.getSpec().getFlinkConfiguration().put(testConf.key(), 
"latest");
@@ -73,14 +74,21 @@ public class FlinkConfigManagerTest {
         assertEquals("reconciled", 
configManager.getObserveConfig(deployment).get(testConf));
 
         deployment.getSpec().getFlinkConfiguration().put(testConf.key(), 
"stable");
-        
reconciliationStatus.serializeAndSetLastReconciledSpec(deployment.getSpec());
+        
reconciliationStatus.serializeAndSetLastReconciledSpec(deployment.getSpec(), 
deployment);
         reconciliationStatus.markReconciledSpecAsStable();
 
         deployment.getSpec().getFlinkConfiguration().put(testConf.key(), 
"rolled-back");
-        
reconciliationStatus.serializeAndSetLastReconciledSpec(deployment.getSpec());
+        
reconciliationStatus.serializeAndSetLastReconciledSpec(deployment.getSpec(), 
deployment);
         reconciliationStatus.setState(ReconciliationState.ROLLED_BACK);
 
         assertEquals("stable", 
configManager.getObserveConfig(deployment).get(testConf));
+
+        deployment.getMetadata().setGeneration(5L);
+        var deployConfig =
+                configManager.getDeployConfig(deployment.getMetadata(), 
deployment.getSpec());
+        assertEquals(
+                Map.of(FlinkUtils.CR_GENERATION_LABEL, "5"),
+                
deployConfig.get(KubernetesConfigOptions.JOB_MANAGER_ANNOTATIONS));
     }
 
     @Test
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java
index 946f7b5..5182ffa 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/SavepointObserverTest.java
@@ -138,7 +138,7 @@ public class SavepointObserverTest {
         deployment
                 .getStatus()
                 .getReconciliationStatus()
-                .serializeAndSetLastReconciledSpec(deployment.getSpec());
+                .serializeAndSetLastReconciledSpec(deployment.getSpec(), 
deployment);
 
         var jobStatus = deployment.getStatus().getJobStatus();
         jobStatus.setState("RUNNING");
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index b9e30f8..f7ac717 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -26,13 +26,14 @@ import 
org.apache.flink.kubernetes.operator.TestingStatusRecorder;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
-import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
 import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 
@@ -45,9 +46,11 @@ import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
 import java.time.Instant;
+import java.util.HashMap;
 import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -73,20 +76,15 @@ public class ApplicationObserverTest {
     @Test
     public void observeApplicationCluster() throws Exception {
         FlinkDeployment deployment = TestUtils.buildApplicationCluster();
-        deployment.setStatus(new FlinkDeploymentStatus());
-
         Configuration conf =
                 configManager.getDeployConfig(deployment.getMetadata(), 
deployment.getSpec());
 
         observer.observe(deployment, TestUtils.createEmptyContext());
         
assertNull(deployment.getStatus().getReconciliationStatus().getLastStableSpec());
 
-        deployment
-                .getStatus()
-                .getReconciliationStatus()
-                .serializeAndSetLastReconciledSpec(deployment.getSpec());
-        deployment.getStatus().setJobStatus(new JobStatus());
         flinkService.submitApplicationCluster(deployment.getSpec().getJob(), 
conf, false);
+        ReconciliationUtils.updateForSpecReconciliationSuccess(
+                deployment, JobState.RUNNING, new Configuration());
 
         // Validate port check logic
         flinkService.setPortReady(false);
@@ -180,11 +178,9 @@ public class ApplicationObserverTest {
                 configManager.getDeployConfig(deployment.getMetadata(), 
deployment.getSpec());
         flinkService.submitApplicationCluster(deployment.getSpec().getJob(), 
conf, false);
 
-        deployment.setStatus(new FlinkDeploymentStatus());
-        deployment
-                .getStatus()
-                .getReconciliationStatus()
-                .serializeAndSetLastReconciledSpec(deployment.getSpec());
+        deployment.setStatus(deployment.initStatus());
+        ReconciliationUtils.updateForSpecReconciliationSuccess(
+                deployment, JobState.RUNNING, new Configuration());
         
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
 
         observer.observe(deployment, readyContext);
@@ -216,11 +212,9 @@ public class ApplicationObserverTest {
                 configManager.getDeployConfig(deployment.getMetadata(), 
deployment.getSpec());
         flinkService.submitApplicationCluster(deployment.getSpec().getJob(), 
conf, false);
 
-        deployment.setStatus(new FlinkDeploymentStatus());
-        deployment
-                .getStatus()
-                .getReconciliationStatus()
-                .serializeAndSetLastReconciledSpec(deployment.getSpec());
+        deployment.setStatus(deployment.initStatus());
+        ReconciliationUtils.updateForSpecReconciliationSuccess(
+                deployment, JobState.RUNNING, new Configuration());
         
deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
 
         observer.observe(deployment, readyContext);
@@ -418,10 +412,8 @@ public class ApplicationObserverTest {
     }
 
     private void bringToReadyStatus(FlinkDeployment deployment) {
-        deployment
-                .getStatus()
-                .getReconciliationStatus()
-                .serializeAndSetLastReconciledSpec(deployment.getSpec());
+        ReconciliationUtils.updateForSpecReconciliationSuccess(
+                deployment, JobState.RUNNING, new Configuration());
         JobStatus jobStatus = new JobStatus();
         jobStatus.setJobName("jobname");
         jobStatus.setJobId("0000000000");
@@ -451,4 +443,71 @@ public class ApplicationObserverTest {
                         });
         assertEquals(podFailedMessage, exception.getMessage());
     }
+
+    @Test
+    public void observeAlreadyUpgraded() {
+        var kubernetesDeployment = TestUtils.createDeployment(true);
+        kubernetesDeployment.getMetadata().setAnnotations(new HashMap<>());
+
+        var context = 
TestUtils.createContextWithDeployment(kubernetesDeployment);
+
+        FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+        deployment.getMetadata().setGeneration(123L);
+
+        var status = deployment.getStatus();
+        var reconStatus = status.getReconciliationStatus();
+
+        // New deployment
+        assertEquals(ReconciliationState.UPGRADING, reconStatus.getState());
+        assertNull(reconStatus.getLastReconciledSpec());
+
+        kubernetesDeployment.getMetadata().setGeneration(1L);
+        observer.observe(deployment, context);
+
+        assertEquals(ReconciliationState.UPGRADING, reconStatus.getState());
+        assertNull(reconStatus.getLastReconciledSpec());
+
+        kubernetesDeployment
+                .getMetadata()
+                .getAnnotations()
+                .put(FlinkUtils.CR_GENERATION_LABEL, "123");
+        observer.observe(deployment, context);
+
+        assertEquals(ReconciliationState.DEPLOYED, reconStatus.getState());
+        assertNotNull(reconStatus.getLastReconciledSpec());
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
+                status.getJobManagerDeploymentStatus());
+
+        deployment.getMetadata().setGeneration(321L);
+
+        ReconciliationUtils.updateForSpecReconciliationSuccess(
+                deployment,
+                JobState.SUSPENDED,
+                new FlinkConfigManager(new Configuration())
+                        .getDeployConfig(deployment.getMetadata(), 
deployment.getSpec()));
+        status = deployment.getStatus();
+        reconStatus = status.getReconciliationStatus();
+
+        assertEquals(status.getReconciliationStatus().getState(), 
ReconciliationState.UPGRADING);
+        
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+
+        observer.observe(deployment, TestUtils.createEmptyContext());
+        assertEquals(JobManagerDeploymentStatus.MISSING, 
status.getJobManagerDeploymentStatus());
+
+        observer.observe(deployment, context);
+        assertEquals(JobManagerDeploymentStatus.MISSING, 
status.getJobManagerDeploymentStatus());
+
+        kubernetesDeployment
+                .getMetadata()
+                .getAnnotations()
+                .put(FlinkUtils.CR_GENERATION_LABEL, "321");
+        observer.observe(deployment, context);
+
+        assertEquals(ReconciliationState.DEPLOYED, reconStatus.getState());
+        assertNotNull(reconStatus.getLastReconciledSpec());
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYED_NOT_READY,
+                status.getJobManagerDeploymentStatus());
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java
index 515cdb8..ba0df3a 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserverTest.java
@@ -22,7 +22,9 @@ import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 
 import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -42,10 +44,8 @@ public class SessionObserverTest {
         FlinkDeployment deployment = TestUtils.buildSessionCluster();
         SessionObserver observer =
                 new SessionObserver(flinkService, configManager, new 
EventRecorder(null, null));
-        deployment
-                .getStatus()
-                .getReconciliationStatus()
-                .serializeAndSetLastReconciledSpec(deployment.getSpec());
+        ReconciliationUtils.updateForSpecReconciliationSuccess(
+                deployment, JobState.RUNNING, new Configuration());
 
         observer.observe(deployment, readyContext);
         
assertNull(deployment.getStatus().getReconciliationStatus().getLastStableSpec());
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
index 3b3f4b2..3f37c18 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
@@ -20,10 +20,10 @@ package org.apache.flink.kubernetes.operator.utils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
-import org.apache.flink.kubernetes.operator.crd.CrdConstants;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -35,6 +35,7 @@ import org.junit.jupiter.api.Test;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 
 /** Test for {@link 
org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils}. */
 public class ReconciliationUtilsTest {
@@ -46,11 +47,11 @@ public class ReconciliationUtilsTest {
     public void testRescheduleUpgradeImmediately() {
         FlinkDeployment app = TestUtils.buildApplicationCluster();
         app.getSpec().getJob().setState(JobState.RUNNING);
+        
app.getStatus().getReconciliationStatus().setState(ReconciliationState.DEPLOYED);
+        ReconciliationUtils.updateForSpecReconciliationSuccess(
+                app, JobState.RUNNING, new Configuration());
         FlinkDeployment previous = ReconciliationUtils.clone(app);
         FlinkDeployment current = ReconciliationUtils.clone(app);
-        current.getStatus()
-                .getReconciliationStatus()
-                
.serializeAndSetLastReconciledSpec(ReconciliationUtils.clone(current.getSpec()));
         ReconciliationUtils.updateForSpecReconciliationSuccess(
                 current, JobState.SUSPENDED, new Configuration());
 
@@ -72,12 +73,29 @@ public class ReconciliationUtilsTest {
     @Test
     public void testSpecSerializationWithVersion() throws 
JsonProcessingException {
         FlinkDeployment app = TestUtils.buildApplicationCluster();
-        String serialized = 
ReconciliationUtils.writeSpecWithCurrentVersion(app.getSpec());
+        app.getMetadata().setGeneration(12L);
+        String serialized = 
ReconciliationUtils.writeSpecWithMeta(app.getSpec(), app);
         ObjectNode node = (ObjectNode) new ObjectMapper().readTree(serialized);
-        assertEquals(CrdConstants.API_VERSION, 
node.get("apiVersion").asText());
+
+        ObjectNode internalMeta =
+                (ObjectNode) 
node.get(ReconciliationUtils.INTERNAL_METADATA_JSON_KEY);
+        assertEquals("flink.apache.org/v1beta1", 
internalMeta.get("apiVersion").asText());
+        assertEquals(12L, 
internalMeta.get("metadata").get("generation").asLong());
         assertEquals(
                 app.getSpec(),
-                ReconciliationUtils.deserializedSpecWithVersion(
-                        serialized, FlinkDeploymentSpec.class));
+                ReconciliationUtils.deserializeSpecWithMeta(serialized, 
FlinkDeploymentSpec.class)
+                        .f0);
+
+        // test backward compatibility
+        String oldSerialized =
+                
"{\"job\":{\"jarURI\":\"local:///opt/flink/examples/streaming/StateMachineExample.jar\",\"parallelism\":2,\"entryClass\":null,\"args\":[],\"state\":\"running\",\"savepointTriggerNonce\":null,\"initialSavepointPath\":null,\"upgradeMode\":\"stateless\",\"allowNonRestoredState\":null},\"restartNonce\":null,\"flinkConfiguration\":{\"taskmanager.numberOfTaskSlots\":\"2\"},\"image\":\"flink:1.15\",\"imagePullPolicy\":null,\"serviceAccount\":\"flink\",\"flinkVersion\":\"v1_15\",
 [...]
+
+        var migrated =
+                ReconciliationUtils.deserializeSpecWithMeta(
+                        oldSerialized, FlinkDeploymentSpec.class);
+        assertEquals(
+                
"local:///opt/flink/examples/streaming/StateMachineExample.jar",
+                migrated.f0.getJob().getJarURI());
+        assertNull(migrated.f1);
     }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SavepointUtilsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SavepointUtilsTest.java
index ee004a0..f369037 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SavepointUtilsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/SavepointUtilsTest.java
@@ -53,7 +53,7 @@ public class SavepointUtilsTest {
         deployment
                 .getStatus()
                 .getReconciliationStatus()
-                .serializeAndSetLastReconciledSpec(deployment.getSpec());
+                .serializeAndSetLastReconciledSpec(deployment.getSpec(), 
deployment);
 
         assertEquals(
                 Optional.empty(),
@@ -67,7 +67,7 @@ public class SavepointUtilsTest {
         deployment
                 .getStatus()
                 .getReconciliationStatus()
-                .serializeAndSetLastReconciledSpec(deployment.getSpec());
+                .serializeAndSetLastReconciledSpec(deployment.getSpec(), 
deployment);
 
         assertEquals(
                 Optional.of(SavepointTriggerType.PERIODIC),
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
index 209c1d2..6724005 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/StatusRecorderTest.java
@@ -45,7 +45,7 @@ public class StatusRecorderTest {
         helper.patchAndCacheStatus(deployment);
         assertTrue(mockServer.getLastRequest() != lastRequest);
         lastRequest = mockServer.getLastRequest();
-        
deployment.getStatus().getReconciliationStatus().setState(ReconciliationState.UPGRADING);
+        
deployment.getStatus().getReconciliationStatus().setState(ReconciliationState.ROLLING_BACK);
         helper.patchAndCacheStatus(deployment);
 
         // We intentionally compare references
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index d0dce8f..e0c929c 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -257,7 +257,7 @@ public class DefaultValidatorTest {
 
                     dep.getStatus()
                             .getReconciliationStatus()
-                            .serializeAndSetLastReconciledSpec(spec);
+                            .serializeAndSetLastReconciledSpec(spec, dep);
 
                     dep.getSpec()
                             .getFlinkConfiguration()
@@ -278,7 +278,7 @@ public class DefaultValidatorTest {
                     dep.getStatus()
                             .getReconciliationStatus()
                             .serializeAndSetLastReconciledSpec(
-                                    ReconciliationUtils.clone(dep.getSpec()));
+                                    ReconciliationUtils.clone(dep.getSpec()), 
dep);
                     dep.getSpec().setJob(null);
                 },
                 "Cannot switch from job to session cluster");
@@ -294,7 +294,7 @@ public class DefaultValidatorTest {
                     spec.setJob(null);
                     dep.getStatus()
                             .getReconciliationStatus()
-                            .serializeAndSetLastReconciledSpec(spec);
+                            .serializeAndSetLastReconciledSpec(spec, dep);
                 },
                 "Cannot switch from session to job cluster");
 
@@ -316,7 +316,7 @@ public class DefaultValidatorTest {
 
                     dep.getStatus()
                             .getReconciliationStatus()
-                            .serializeAndSetLastReconciledSpec(spec);
+                            .serializeAndSetLastReconciledSpec(spec, dep);
                     
dep.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
                 },
                 String.format(

Reply via email to