This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 760a20d  [FLINK-26577] Trigger a savepoint when upgrade mode changed 
to last-state and HA data not available
760a20d is described below

commit 760a20dd3f0c2088aeafd6613d9f486425fb55a3
Author: Yang Wang <[email protected]>
AuthorDate: Fri Mar 25 20:33:34 2022 +0800

    [FLINK-26577] Trigger a savepoint when upgrade mode changed to last-state 
and HA data not available
---
 .../operator/reconciler/JobReconciler.java         |  58 +++++++----
 .../operator/reconciler/ReconciliationUtils.java   |  20 ++++
 .../validation/DefaultDeploymentValidator.java     |  15 +++
 .../operator/reconciler/JobReconcilerTest.java     | 107 ++++++++++++++++++++-
 .../validation/DeploymentValidatorTest.java        |  29 ++++++
 5 files changed, 205 insertions(+), 24 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
index 608ebeb..60c6851 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
@@ -34,6 +34,7 @@ import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.flink.util.Preconditions;
 
 import io.fabric8.kubernetes.client.KubernetesClient;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -82,7 +83,11 @@ public class JobReconciler extends BaseReconciler {
         }
 
         boolean specChanged = !flinkApp.getSpec().equals(lastReconciledSpec);
-        if (specChanged && readyForSpecChanges(flinkApp)) {
+        if (specChanged) {
+            if (!inUpgradeableState(flinkApp)) {
+                LOG.info("Waiting for upgradeable state");
+                return;
+            }
             JobState currentJobState = lastReconciledSpec.getJob().getState();
             JobState desiredJobState = jobSpec.getState();
 
@@ -92,7 +97,7 @@ public class JobReconciler extends BaseReconciler {
                 if (desiredJobState == JobState.RUNNING) {
                     LOG.info("Upgrading running job, suspending first...");
                 }
-                printCancelLogs(upgradeMode, flinkApp.getMetadata().getName());
+                printCancelLogs(upgradeMode);
                 suspendJob(flinkApp, upgradeMode, effectiveConfig);
                 stateAfterReconcile = JobState.SUSPENDED;
             }
@@ -113,9 +118,12 @@ public class JobReconciler extends BaseReconciler {
         }
     }
 
-    private boolean readyForSpecChanges(FlinkDeployment deployment) {
-        if (deployment.getSpec().getJob().getUpgradeMode() != 
UpgradeMode.SAVEPOINT) {
-            // Only savepoint upgrade mode needs a running job
+    private boolean inUpgradeableState(FlinkDeployment deployment) {
+        if (deployment.getSpec().getJob().getUpgradeMode() != 
UpgradeMode.SAVEPOINT
+                && 
!ReconciliationUtils.isUpgradeModeChangedToLastStateAndHADisabledPreviously(
+                        deployment)) {
+            // Only savepoint upgrade mode or changed from stateless/savepoint 
to last-state while
+            // HA disabled previously need a running job
             return true;
         }
         return deployment.getStatus().getJobManagerDeploymentStatus()
@@ -155,7 +163,7 @@ public class JobReconciler extends BaseReconciler {
         deployFlinkJob(flinkApp, effectiveConfig, savepointOpt);
     }
 
-    private void printCancelLogs(UpgradeMode upgradeMode, String name) {
+    private void printCancelLogs(UpgradeMode upgradeMode) {
         switch (upgradeMode) {
             case STATELESS:
                 LOG.info("Cancelling job");
@@ -171,30 +179,40 @@ public class JobReconciler extends BaseReconciler {
         }
     }
 
-    private Optional<String> suspendJob(
+    private Optional<String> internalSuspendJob(
             FlinkDeployment flinkApp, UpgradeMode upgradeMode, Configuration 
effectiveConfig)
             throws Exception {
-
-        Optional<String> savepointOpt = Optional.empty();
+        final String jobIdString = 
flinkApp.getStatus().getJobStatus().getJobId();
+        // Always trigger a savepoint when upgrade mode changes from 
stateless/savepoint to
+        // last-state and HA is disabled previously. This is a safeguard to 
ensure the state is
+        // never lost.
+        if 
(ReconciliationUtils.isUpgradeModeChangedToLastStateAndHADisabledPreviously(flinkApp))
 {
+            return flinkService.cancelJob(
+                    
JobID.fromHexString(Preconditions.checkNotNull(jobIdString)),
+                    UpgradeMode.SAVEPOINT,
+                    effectiveConfig);
+        }
         if (upgradeMode == UpgradeMode.STATELESS) {
             shutdown(flinkApp, effectiveConfig);
-        } else {
-            String jobIdString = 
flinkApp.getStatus().getJobStatus().getJobId();
-            savepointOpt =
-                    flinkService.cancelJob(
-                            jobIdString != null ? 
JobID.fromHexString(jobIdString) : null,
-                            upgradeMode,
-                            effectiveConfig);
+            return Optional.empty();
         }
+        return flinkService.cancelJob(
+                jobIdString != null ? JobID.fromHexString(jobIdString) : null,
+                upgradeMode,
+                effectiveConfig);
+    }
+
+    private void suspendJob(
+            FlinkDeployment flinkApp, UpgradeMode upgradeMode, Configuration 
effectiveConfig)
+            throws Exception {
+        final Optional<String> savepointOpt =
+                internalSuspendJob(flinkApp, upgradeMode, effectiveConfig);
 
         JobStatus jobStatus = flinkApp.getStatus().getJobStatus();
         jobStatus.setState(JobState.SUSPENDED.name());
         
flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
         savepointOpt.ifPresent(
-                location -> {
-                    
jobStatus.getSavepointInfo().setLastSavepoint(Savepoint.of(location));
-                });
-        return savepointOpt;
+                location -> 
jobStatus.getSavepointInfo().setLastSavepoint(Savepoint.of(location)));
     }
 
     @Override
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 219ebbd..0adc0a7 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -17,11 +17,15 @@
 
 package org.apache.flink.kubernetes.operator.reconciler;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.util.Preconditions;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -118,6 +122,22 @@ public class ReconciliationUtils {
         return updateControl.rescheduleAfter(rescheduleAfter.toMillis());
     }
 
+    public static boolean 
isUpgradeModeChangedToLastStateAndHADisabledPreviously(
+            FlinkDeployment flinkApp) {
+        final FlinkDeploymentSpec lastReconciledSpec =
+                Preconditions.checkNotNull(
+                        
flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec());
+        final UpgradeMode previousUpgradeMode = 
lastReconciledSpec.getJob().getUpgradeMode();
+        final UpgradeMode currentUpgradeMode = 
flinkApp.getSpec().getJob().getUpgradeMode();
+
+        final Configuration lastReconciledFlinkConfig =
+                
Configuration.fromMap(lastReconciledSpec.getFlinkConfiguration());
+
+        return previousUpgradeMode != UpgradeMode.LAST_STATE
+                && currentUpgradeMode == UpgradeMode.LAST_STATE
+                && 
!HighAvailabilityMode.isHighAvailabilityModeActivated(lastReconciledFlinkConfig);
+    }
+
     private static boolean isJobUpgradeInProgress(FlinkDeployment current) {
         ReconciliationStatus reconciliationStatus = 
current.getStatus().getReconciliationStatus();
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
index 297ede0..9228454 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
@@ -32,6 +32,8 @@ import org.apache.flink.kubernetes.operator.crd.spec.Resource;
 import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
 import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
 import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import 
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 import org.apache.flink.kubernetes.utils.Constants;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
@@ -251,6 +253,19 @@ public class DefaultDeploymentValidator implements 
FlinkDeploymentValidator {
                             == null)) {
                 return Optional.of("Cannot perform savepoint restore without a 
valid savepoint");
             }
+
+            if (StringUtils.isNullOrWhitespaceOnly(
+                            newSpec.getFlinkConfiguration()
+                                    
.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key()))
+                    && deployment.getStatus().getJobManagerDeploymentStatus()
+                            != JobManagerDeploymentStatus.MISSING
+                    && 
ReconciliationUtils.isUpgradeModeChangedToLastStateAndHADisabledPreviously(
+                            deployment)) {
+                return Optional.of(
+                        String.format(
+                                "Job could not be upgraded to last-state while 
config key[%s] is not set",
+                                
CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
+            }
         }
 
         return Optional.empty();
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
index d201961..8747056 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.reconciler;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
@@ -38,6 +39,7 @@ import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -209,14 +211,111 @@ public class JobReconcilerTest {
         
assertNull(spDeployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
     }
 
-    private void verifyAndSetRunningJobsToStatus(
-            FlinkDeployment deployment, List<Tuple2<String, JobStatusMessage>> 
runningJobs) {
-        assertEquals(1, runningJobs.size());
-        assertNull(runningJobs.get(0).f0);
+    @Test
+    public void 
testUpgradeModeChangedToLastStateShouldTriggerSavepointWhileHADisabled()
+            throws Exception {
+        final Context context = 
TestUtils.createContextWithReadyJobManagerDeployment();
+        final TestingFlinkService flinkService = new TestingFlinkService();
+
+        final JobReconciler reconciler =
+                new JobReconciler(null, flinkService, operatorConfiguration);
+        final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+        final Configuration config = FlinkUtils.getEffectiveConfig(deployment, 
new Configuration());
+        
deployment.getSpec().getFlinkConfiguration().remove(HighAvailabilityOptions.HA_MODE.key());
+        config.removeConfig(HighAvailabilityOptions.HA_MODE);
+
+        reconciler.reconcile(deployment, context, config);
+        deployment
+                .getStatus()
+                .getReconciliationStatus()
+                
.setLastReconciledSpec(ReconciliationUtils.clone(deployment.getSpec()));
+
+        assertEquals(
+                JobManagerDeploymentStatus.DEPLOYING,
+                deployment.getStatus().getJobManagerDeploymentStatus());
+
+        // Not ready for spec changes, the reconciliation is not performed
+        final String newImage = "new-image-1";
+        deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+        deployment.getSpec().setImage(newImage);
+        reconciler.reconcile(deployment, context, config);
+        reconciler.reconcile(deployment, context, config);
+        assertNull(config.get(SavepointConfigOptions.SAVEPOINT_PATH));
+        assertNull(flinkService.listJobs().get(0).f0);
+        assertNotEquals(
+                newImage,
+                deployment
+                        .getStatus()
+                        .getReconciliationStatus()
+                        .getLastReconciledSpec()
+                        .getImage());
+
+        // Ready for spec changes, the reconciliation should be performed
+        verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+        reconciler.reconcile(deployment, context, config);
+        reconciler.reconcile(deployment, context, config);
+        assertEquals(
+                newImage,
+                deployment
+                        .getStatus()
+                        .getReconciliationStatus()
+                        .getLastReconciledSpec()
+                        .getImage());
+        // Upgrade mode changes from stateless to last-state should trigger a 
savepoint
+        final String expectedSavepointPath = "savepoint_0";
+        assertEquals(expectedSavepointPath, 
config.get(SavepointConfigOptions.SAVEPOINT_PATH));
+        final List<Tuple2<String, JobStatusMessage>> runningJobs = 
flinkService.listJobs();
+        assertEquals(expectedSavepointPath, runningJobs.get(0).f0);
+    }
+
+    @Test
+    public void 
testUpgradeModeChangedToLastStateShouldNotTriggerSavepointWhileHAEnabled()
+            throws Exception {
+        final Context context = 
TestUtils.createContextWithReadyJobManagerDeployment();
+        final TestingFlinkService flinkService = new TestingFlinkService();
+
+        final JobReconciler reconciler =
+                new JobReconciler(null, flinkService, operatorConfiguration);
+        final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+        final Configuration config = FlinkUtils.getEffectiveConfig(deployment, 
new Configuration());
+
+        reconciler.reconcile(deployment, context, config);
         deployment
                 .getStatus()
                 .getReconciliationStatus()
                 
.setLastReconciledSpec(ReconciliationUtils.clone(deployment.getSpec()));
+        assertNotEquals(
+                UpgradeMode.LAST_STATE,
+                deployment
+                        .getStatus()
+                        .getReconciliationStatus()
+                        .getLastReconciledSpec()
+                        .getJob()
+                        .getUpgradeMode());
+
+        final String newImage = "new-image-1";
+        deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+        deployment.getSpec().setImage(newImage);
+        verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+        reconciler.reconcile(deployment, context, config);
+        reconciler.reconcile(deployment, context, config);
+        assertEquals(
+                newImage,
+                deployment
+                        .getStatus()
+                        .getReconciliationStatus()
+                        .getLastReconciledSpec()
+                        .getImage());
+        // Upgrade mode changes from stateless to last-state while HA enabled 
previously should not
+        // trigger a savepoint
+        assertNull(config.get(SavepointConfigOptions.SAVEPOINT_PATH));
+        assertNull(flinkService.listJobs().get(0).f0);
+    }
+
+    private void verifyAndSetRunningJobsToStatus(
+            FlinkDeployment deployment, List<Tuple2<String, JobStatusMessage>> 
runningJobs) {
+        assertEquals(1, runningJobs.size());
+        assertNull(runningJobs.get(0).f0);
 
         JobStatus jobStatus = new JobStatus();
         jobStatus.setJobName(runningJobs.get(0).f1.getJobName());
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
index 9c71671..accce51 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.kubernetes.operator.validation;
 
 import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
@@ -29,6 +30,7 @@ import 
org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
 import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
 import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
+import 
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.utils.Constants;
 
@@ -245,6 +247,33 @@ public class DeploymentValidatorTest {
                 },
                 "Cannot switch from session to job cluster");
 
+        // Test upgrade mode change validation
+        testError(
+                dep -> {
+                    
dep.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+                    dep.setStatus(new FlinkDeploymentStatus());
+                    dep.getStatus().setJobStatus(new JobStatus());
+
+                    dep.getStatus().setReconciliationStatus(new 
ReconciliationStatus());
+                    dep.getStatus()
+                            .getReconciliationStatus()
+                            
.setLastReconciledSpec(ReconciliationUtils.clone(dep.getSpec()));
+                    dep.getStatus()
+                            .getReconciliationStatus()
+                            .getLastReconciledSpec()
+                            .getJob()
+                            .setUpgradeMode(UpgradeMode.STATELESS);
+                    dep.getStatus()
+                            .getReconciliationStatus()
+                            .getLastReconciledSpec()
+                            .getFlinkConfiguration()
+                            .remove(HighAvailabilityOptions.HA_MODE.key());
+                    
dep.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+                },
+                String.format(
+                        "Job could not be upgraded to last-state while config 
key[%s] is not set",
+                        CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
+
         testError(dep -> dep.getSpec().setFlinkVersion(null), "Flink Version 
must be defined.");
 
         testSuccess(dep -> dep.getSpec().setFlinkVersion(FlinkVersion.v1_15));

Reply via email to