This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new f4b196f [FLINK-28350] Always use savepoint when switching between
Flink versions
f4b196f is described below
commit f4b196f334f8adcaa3fb5d3d41a5b079675ef610
Author: Gyula Fora <[email protected]>
AuthorDate: Mon Jul 4 21:39:36 2022 +0200
[FLINK-28350] Always use savepoint when switching between Flink versions
---
.../observer/sessionjob/SessionJobObserver.java | 5 +---
.../AbstractFlinkResourceReconciler.java | 30 +++++++++++++++++-----
.../deployment/AbstractJobReconciler.java | 13 +++++++---
.../deployment/ApplicationReconciler.java | 4 ++-
.../operator/validation/DefaultValidator.java | 6 +----
.../deployment/ApplicationReconcilerTest.java | 30 ++++++++++++++++++++++
6 files changed, 69 insertions(+), 19 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
index 78922f1..a91edef 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/sessionjob/SessionJobObserver.java
@@ -94,10 +94,7 @@ public class SessionJobObserver implements
Observer<FlinkSessionJob> {
@Override
public void observe(FlinkSessionJob flinkSessionJob, Context context) {
- var lastReconciledSpec =
-
flinkSessionJob.getStatus().getReconciliationStatus().getLastReconciledSpec();
-
- if (lastReconciledSpec == null) {
+ if
(flinkSessionJob.getStatus().getReconciliationStatus().isFirstDeployment()) {
return;
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 64f15d8..2d26e82 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -23,12 +23,12 @@ import
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptio
import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
+import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
-import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
@@ -140,7 +140,7 @@ public abstract class AbstractFlinkResourceReconciler<
EventRecorder.Component.JobManagerDeployment,
MSG_SPEC_CHANGED);
reconcileSpecChange(cr, observeConfig, deployConfig);
- } else if (shouldRollBack(reconciliationStatus, observeConfig)) {
+ } else if (shouldRollBack(cr, observeConfig)) {
// Rollbacks are executed in two steps, we initiate it first then
return
if (initiateRollBack(status)) {
return;
@@ -284,13 +284,14 @@ public abstract class AbstractFlinkResourceReconciler<
*
* <p>Rollbacks are only supported to previously running resource specs
with HA enabled.
*
- * @param reconciliationStatus ReconciliationStatus of the resource.
+ * @param resource Resource being reconciled.
* @param configuration Flink cluster configuration.
* @return True if the resource should be rolled back.
*/
private boolean shouldRollBack(
- ReconciliationStatus<SPEC> reconciliationStatus, Configuration
configuration) {
+ AbstractFlinkResource<SPEC, STATUS> resource, Configuration
configuration) {
+ var reconciliationStatus =
resource.getStatus().getReconciliationStatus();
if (reconciliationStatus.getState() ==
ReconciliationState.ROLLING_BACK) {
return true;
}
@@ -302,13 +303,22 @@ public abstract class AbstractFlinkResourceReconciler<
}
var lastStableSpec = reconciliationStatus.deserializeLastStableSpec();
- if (lastStableSpec != null
- && lastStableSpec.getJob() != null
+ if (lastStableSpec == null) {
+ // Nothing to roll back to yet
+ return false;
+ }
+
+ if (lastStableSpec.getJob() != null
&& lastStableSpec.getJob().getState() == JobState.SUSPENDED) {
// Should not roll back to suspended state
return false;
}
+ if (flinkVersionChanged(resource.getSpec(), lastStableSpec)) {
+ // Should not roll back Flink version changes
+ return false;
+ }
+
Duration readinessTimeout =
configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_READINESS_TIMEOUT);
if (!Instant.now()
@@ -375,4 +385,12 @@ public abstract class AbstractFlinkResourceReconciler<
&& (deployment.getStatus().getJobManagerDeploymentStatus()
== JobManagerDeploymentStatus.MISSING);
}
+
+ protected boolean flinkVersionChanged(SPEC oldSpec, SPEC newSpec) {
+ if (oldSpec instanceof FlinkDeploymentSpec) {
+ return ((FlinkDeploymentSpec) oldSpec).getFlinkVersion()
+ != ((FlinkDeploymentSpec) newSpec).getFlinkVersion();
+ }
+ return false;
+ }
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 813c152..7f4cbdf 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -98,6 +98,7 @@ public abstract class AbstractJobReconciler<
if (availableUpgradeMode.isEmpty()) {
return;
}
+
eventRecorder.triggerEvent(
resource,
EventRecorder.Type.Normal,
@@ -140,7 +141,7 @@ public abstract class AbstractJobReconciler<
if (upgradeMode == UpgradeMode.STATELESS) {
LOG.info("Stateless job, ready for upgrade");
- return Optional.of(upgradeMode);
+ return Optional.of(UpgradeMode.STATELESS);
}
if (ReconciliationUtils.isJobInTerminalState(status)) {
@@ -155,9 +156,15 @@ public abstract class AbstractJobReconciler<
LOG.info(
"Using savepoint upgrade mode when switching to
last-state without HA previously enabled");
return Optional.of(UpgradeMode.SAVEPOINT);
- } else {
- return Optional.of(upgradeMode);
}
+
+ if (flinkVersionChanged(
+ ReconciliationUtils.getDeployedSpec(resource),
resource.getSpec())) {
+ LOG.info("Using savepoint upgrade mode when upgrading Flink
version");
+ return Optional.of(UpgradeMode.SAVEPOINT);
+ }
+
+ return Optional.of(upgradeMode);
}
return Optional.empty();
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 2747349..df0efe3 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -91,7 +91,9 @@ public class ApplicationReconciler
.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED)
&& FlinkUtils.isKubernetesHAActivated(deployConfig)
&& FlinkUtils.isKubernetesHAActivated(observeConfig)
- && flinkService.isHaMetadataAvailable(deployConfig)) {
+ && flinkService.isHaMetadataAvailable(deployConfig)
+ && !flinkVersionChanged(
+ ReconciliationUtils.getDeployedSpec(deployment),
deployment.getSpec())) {
LOG.info(
"Job is not running but HA metadata is available for last
state restore, ready for upgrade");
return Optional.of(UpgradeMode.LAST_STATE);
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index b589f22..de3c7b6 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -287,11 +287,7 @@ public class DefaultValidator implements
FlinkResourceValidator {
FlinkDeployment deployment, Map<String, String> effectiveConfig) {
FlinkDeploymentSpec newSpec = deployment.getSpec();
- if (deployment.getStatus() == null
- || deployment.getStatus().getReconciliationStatus() == null
- ||
deployment.getStatus().getReconciliationStatus().getLastReconciledSpec()
- == null) {
- // New deployment
+ if
(deployment.getStatus().getReconciliationStatus().isFirstDeployment()) {
if (newSpec.getJob() != null &&
!newSpec.getJob().getState().equals(JobState.RUNNING)) {
return Optional.of("Job must start in running state");
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 18202bf..439d848 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -32,6 +32,7 @@ import
org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import
org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
+import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
import org.apache.flink.kubernetes.operator.crd.status.SavepointTriggerType;
import
org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
@@ -512,4 +513,33 @@ public class ApplicationReconcilerTest {
Assertions.assertTrue(path2.startsWith(haStoragePath));
assertNotEquals(path1, path2);
}
+
+ @Test
+ public void testAlwaysSavepointOnFlinkVersionChange() throws Exception {
+ var deployment = TestUtils.buildApplicationCluster(FlinkVersion.v1_14);
+ deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+
+ var ctx = flinkService.getContext();
+ reconciler.reconcile(deployment, ctx);
+
+ deployment.getSpec().setFlinkVersion(FlinkVersion.v1_15);
+
+ var reconStatus = deployment.getStatus().getReconciliationStatus();
+
+ // Do not trigger update until running
+ reconciler.reconcile(deployment, ctx);
+ assertEquals(ReconciliationState.DEPLOYED, reconStatus.getState());
+
+
deployment.getStatus().getJobStatus().setState(JobState.RUNNING.name());
+ deployment
+ .getStatus()
+ .getJobStatus()
+
.setJobId(flinkService.listJobs().get(0).f1.getJobId().toHexString());
+
+ reconciler.reconcile(deployment, ctx);
+ assertEquals(ReconciliationState.UPGRADING, reconStatus.getState());
+ assertEquals(
+ UpgradeMode.SAVEPOINT,
+
reconStatus.deserializeLastReconciledSpec().getJob().getUpgradeMode());
+ }
}