This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new f4b196f  [FLINK-28350] Always use savepoint when switching between 
Flink versions
f4b196f is described below

commit f4b196f334f8adcaa3fb5d3d41a5b079675ef610
Author: Gyula Fora <[email protected]>
AuthorDate: Mon Jul 4 21:39:36 2022 +0200

    [FLINK-28350] Always use savepoint when switching between Flink versions
---
 .../observer/sessionjob/SessionJobObserver.java    |  5 +---
 .../AbstractFlinkResourceReconciler.java           | 30 +++++++++++++++++-----
 .../deployment/AbstractJobReconciler.java          | 13 +++++++---
 .../deployment/ApplicationReconciler.java          |  4 ++-
 .../operator/validation/DefaultValidator.java      |  6 +----
 .../deployment/ApplicationReconcilerTest.java      | 30 ++++++++++++++++++++++
 6 files changed, 69 insertions(+), 19 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
index 78922f1..a91edef 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
@@ -94,10 +94,7 @@ public class SessionJobObserver implements 
Observer<FlinkSessionJob> {
 
     @Override
     public void observe(FlinkSessionJob flinkSessionJob, Context context) {
-        var lastReconciledSpec =
-                
flinkSessionJob.getStatus().getReconciliationStatus().getLastReconciledSpec();
-
-        if (lastReconciledSpec == null) {
+        if 
(flinkSessionJob.getStatus().getReconciliationStatus().isFirstDeployment()) {
             return;
         }
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 64f15d8..2d26e82 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -23,12 +23,12 @@ import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptio
 import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
 import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
 import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
-import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.service.FlinkService;
@@ -140,7 +140,7 @@ public abstract class AbstractFlinkResourceReconciler<
                     EventRecorder.Component.JobManagerDeployment,
                     MSG_SPEC_CHANGED);
             reconcileSpecChange(cr, observeConfig, deployConfig);
-        } else if (shouldRollBack(reconciliationStatus, observeConfig)) {
+        } else if (shouldRollBack(cr, observeConfig)) {
             // Rollbacks are executed in two steps, we initiate it first then 
return
             if (initiateRollBack(status)) {
                 return;
@@ -284,13 +284,14 @@ public abstract class AbstractFlinkResourceReconciler<
      *
      * <p>Rollbacks are only supported to previously running resource specs 
with HA enabled.
      *
-     * @param reconciliationStatus ReconciliationStatus of the resource.
+     * @param resource Resource being reconciled.
      * @param configuration Flink cluster configuration.
      * @return True if the resource should be rolled back.
      */
     private boolean shouldRollBack(
-            ReconciliationStatus<SPEC> reconciliationStatus, Configuration 
configuration) {
+            AbstractFlinkResource<SPEC, STATUS> resource, Configuration 
configuration) {
 
+        var reconciliationStatus = 
resource.getStatus().getReconciliationStatus();
         if (reconciliationStatus.getState() == 
ReconciliationState.ROLLING_BACK) {
             return true;
         }
@@ -302,13 +303,22 @@ public abstract class AbstractFlinkResourceReconciler<
         }
 
         var lastStableSpec = reconciliationStatus.deserializeLastStableSpec();
-        if (lastStableSpec != null
-                && lastStableSpec.getJob() != null
+        if (lastStableSpec == null) {
+            // Nothing to roll back to yet
+            return false;
+        }
+
+        if (lastStableSpec.getJob() != null
                 && lastStableSpec.getJob().getState() == JobState.SUSPENDED) {
             // Should not roll back to suspended state
             return false;
         }
 
+        if (flinkVersionChanged(resource.getSpec(), lastStableSpec)) {
+            // Should not roll back Flink version changes
+            return false;
+        }
+
         Duration readinessTimeout =
                 
configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_READINESS_TIMEOUT);
         if (!Instant.now()
@@ -375,4 +385,12 @@ public abstract class AbstractFlinkResourceReconciler<
                 && (deployment.getStatus().getJobManagerDeploymentStatus()
                         == JobManagerDeploymentStatus.MISSING);
     }
+
+    protected boolean flinkVersionChanged(SPEC oldSpec, SPEC newSpec) {
+        if (oldSpec instanceof FlinkDeploymentSpec) {
+            return ((FlinkDeploymentSpec) oldSpec).getFlinkVersion()
+                    != ((FlinkDeploymentSpec) newSpec).getFlinkVersion();
+        }
+        return false;
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 813c152..7f4cbdf 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -98,6 +98,7 @@ public abstract class AbstractJobReconciler<
             if (availableUpgradeMode.isEmpty()) {
                 return;
             }
+
             eventRecorder.triggerEvent(
                     resource,
                     EventRecorder.Type.Normal,
@@ -140,7 +141,7 @@ public abstract class AbstractJobReconciler<
 
         if (upgradeMode == UpgradeMode.STATELESS) {
             LOG.info("Stateless job, ready for upgrade");
-            return Optional.of(upgradeMode);
+            return Optional.of(UpgradeMode.STATELESS);
         }
 
         if (ReconciliationUtils.isJobInTerminalState(status)) {
@@ -155,9 +156,15 @@ public abstract class AbstractJobReconciler<
                 LOG.info(
                         "Using savepoint upgrade mode when switching to 
last-state without HA previously enabled");
                 return Optional.of(UpgradeMode.SAVEPOINT);
-            } else {
-                return Optional.of(upgradeMode);
             }
+
+            if (flinkVersionChanged(
+                    ReconciliationUtils.getDeployedSpec(resource), 
resource.getSpec())) {
+                LOG.info("Using savepoint upgrade mode when upgrading Flink 
version");
+                return Optional.of(UpgradeMode.SAVEPOINT);
+            }
+
+            return Optional.of(upgradeMode);
         }
 
         return Optional.empty();
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 2747349..df0efe3 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -91,7 +91,9 @@ public class ApplicationReconciler
                                 
.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED)
                 && FlinkUtils.isKubernetesHAActivated(deployConfig)
                 && FlinkUtils.isKubernetesHAActivated(observeConfig)
-                && flinkService.isHaMetadataAvailable(deployConfig)) {
+                && flinkService.isHaMetadataAvailable(deployConfig)
+                && !flinkVersionChanged(
+                        ReconciliationUtils.getDeployedSpec(deployment), 
deployment.getSpec())) {
             LOG.info(
                     "Job is not running but HA metadata is available for last 
state restore, ready for upgrade");
             return Optional.of(UpgradeMode.LAST_STATE);
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index b589f22..de3c7b6 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -287,11 +287,7 @@ public class DefaultValidator implements 
FlinkResourceValidator {
             FlinkDeployment deployment, Map<String, String> effectiveConfig) {
         FlinkDeploymentSpec newSpec = deployment.getSpec();
 
-        if (deployment.getStatus() == null
-                || deployment.getStatus().getReconciliationStatus() == null
-                || 
deployment.getStatus().getReconciliationStatus().getLastReconciledSpec()
-                        == null) {
-            // New deployment
+        if 
(deployment.getStatus().getReconciliationStatus().isFirstDeployment()) {
             if (newSpec.getJob() != null && 
!newSpec.getJob().getState().equals(JobState.RUNNING)) {
                 return Optional.of("Job must start in running state");
             }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 18202bf..439d848 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
 import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
 import 
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
@@ -512,4 +513,33 @@ public class ApplicationReconcilerTest {
         Assertions.assertTrue(path2.startsWith(haStoragePath));
         assertNotEquals(path1, path2);
     }
+
+    @Test
+    public void testAlwaysSavepointOnFlinkVersionChange() throws Exception {
+        var deployment = TestUtils.buildApplicationCluster(FlinkVersion.v1_14);
+        deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+
+        var ctx = flinkService.getContext();
+        reconciler.reconcile(deployment, ctx);
+
+        deployment.getSpec().setFlinkVersion(FlinkVersion.v1_15);
+
+        var reconStatus = deployment.getStatus().getReconciliationStatus();
+
+        // Do not trigger update until running
+        reconciler.reconcile(deployment, ctx);
+        assertEquals(ReconciliationState.DEPLOYED, reconStatus.getState());
+
+        
deployment.getStatus().getJobStatus().setState(JobState.RUNNING.name());
+        deployment
+                .getStatus()
+                .getJobStatus()
+                
.setJobId(flinkService.listJobs().get(0).f1.getJobId().toHexString());
+
+        reconciler.reconcile(deployment, ctx);
+        assertEquals(ReconciliationState.UPGRADING, reconStatus.getState());
+        assertEquals(
+                UpgradeMode.SAVEPOINT,
+                
reconStatus.deserializeLastReconciledSpec().getJob().getUpgradeMode());
+    }
 }

Reply via email to