This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 760a20d [FLINK-26577] Trigger a savepoint when upgrade mode changed
to last-state and HA data not available
760a20d is described below
commit 760a20dd3f0c2088aeafd6613d9f486425fb55a3
Author: Yang Wang <[email protected]>
AuthorDate: Fri Mar 25 20:33:34 2022 +0800
[FLINK-26577] Trigger a savepoint when upgrade mode changed to last-state
and HA data not available
---
.../operator/reconciler/JobReconciler.java | 58 +++++++----
.../operator/reconciler/ReconciliationUtils.java | 20 ++++
.../validation/DefaultDeploymentValidator.java | 15 +++
.../operator/reconciler/JobReconcilerTest.java | 107 ++++++++++++++++++++-
.../validation/DeploymentValidatorTest.java | 29 ++++++
5 files changed, 205 insertions(+), 24 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
index 608ebeb..60c6851 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/JobReconciler.java
@@ -34,6 +34,7 @@ import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.flink.util.Preconditions;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
@@ -82,7 +83,11 @@ public class JobReconciler extends BaseReconciler {
}
boolean specChanged = !flinkApp.getSpec().equals(lastReconciledSpec);
- if (specChanged && readyForSpecChanges(flinkApp)) {
+ if (specChanged) {
+ if (!inUpgradeableState(flinkApp)) {
+ LOG.info("Waiting for upgradeable state");
+ return;
+ }
JobState currentJobState = lastReconciledSpec.getJob().getState();
JobState desiredJobState = jobSpec.getState();
@@ -92,7 +97,7 @@ public class JobReconciler extends BaseReconciler {
if (desiredJobState == JobState.RUNNING) {
LOG.info("Upgrading running job, suspending first...");
}
- printCancelLogs(upgradeMode, flinkApp.getMetadata().getName());
+ printCancelLogs(upgradeMode);
suspendJob(flinkApp, upgradeMode, effectiveConfig);
stateAfterReconcile = JobState.SUSPENDED;
}
@@ -113,9 +118,12 @@ public class JobReconciler extends BaseReconciler {
}
}
- private boolean readyForSpecChanges(FlinkDeployment deployment) {
- if (deployment.getSpec().getJob().getUpgradeMode() !=
UpgradeMode.SAVEPOINT) {
- // Only savepoint upgrade mode needs a running job
+ private boolean inUpgradeableState(FlinkDeployment deployment) {
+ if (deployment.getSpec().getJob().getUpgradeMode() !=
UpgradeMode.SAVEPOINT
+ &&
!ReconciliationUtils.isUpgradeModeChangedToLastStateAndHADisabledPreviously(
+ deployment)) {
+ // Only savepoint upgrade mode or changed from stateless/savepoint
to last-state while
+ // HA disabled previously need a running job
return true;
}
return deployment.getStatus().getJobManagerDeploymentStatus()
@@ -155,7 +163,7 @@ public class JobReconciler extends BaseReconciler {
deployFlinkJob(flinkApp, effectiveConfig, savepointOpt);
}
- private void printCancelLogs(UpgradeMode upgradeMode, String name) {
+ private void printCancelLogs(UpgradeMode upgradeMode) {
switch (upgradeMode) {
case STATELESS:
LOG.info("Cancelling job");
@@ -171,30 +179,40 @@ public class JobReconciler extends BaseReconciler {
}
}
- private Optional<String> suspendJob(
+ private Optional<String> internalSuspendJob(
FlinkDeployment flinkApp, UpgradeMode upgradeMode, Configuration
effectiveConfig)
throws Exception {
-
- Optional<String> savepointOpt = Optional.empty();
+ final String jobIdString =
flinkApp.getStatus().getJobStatus().getJobId();
+ // Always trigger a savepoint when upgrade mode changes from
stateless/savepoint to
+ // last-state and HA is disabled previously. This is a safeguard to
ensure the state is
+ // never lost.
+ if
(ReconciliationUtils.isUpgradeModeChangedToLastStateAndHADisabledPreviously(flinkApp))
{
+ return flinkService.cancelJob(
+
JobID.fromHexString(Preconditions.checkNotNull(jobIdString)),
+ UpgradeMode.SAVEPOINT,
+ effectiveConfig);
+ }
if (upgradeMode == UpgradeMode.STATELESS) {
shutdown(flinkApp, effectiveConfig);
- } else {
- String jobIdString =
flinkApp.getStatus().getJobStatus().getJobId();
- savepointOpt =
- flinkService.cancelJob(
- jobIdString != null ?
JobID.fromHexString(jobIdString) : null,
- upgradeMode,
- effectiveConfig);
+ return Optional.empty();
}
+ return flinkService.cancelJob(
+ jobIdString != null ? JobID.fromHexString(jobIdString) : null,
+ upgradeMode,
+ effectiveConfig);
+ }
+
+ private void suspendJob(
+ FlinkDeployment flinkApp, UpgradeMode upgradeMode, Configuration
effectiveConfig)
+ throws Exception {
+ final Optional<String> savepointOpt =
+ internalSuspendJob(flinkApp, upgradeMode, effectiveConfig);
JobStatus jobStatus = flinkApp.getStatus().getJobStatus();
jobStatus.setState(JobState.SUSPENDED.name());
flinkApp.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
savepointOpt.ifPresent(
- location -> {
-
jobStatus.getSavepointInfo().setLastSavepoint(Savepoint.of(location));
- });
- return savepointOpt;
+ location ->
jobStatus.getSavepointInfo().setLastSavepoint(Savepoint.of(location)));
}
@Override
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 219ebbd..0adc0a7 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -17,11 +17,15 @@
package org.apache.flink.kubernetes.operator.reconciler;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
+import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.util.Preconditions;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -118,6 +122,22 @@ public class ReconciliationUtils {
return updateControl.rescheduleAfter(rescheduleAfter.toMillis());
}
+ public static boolean
isUpgradeModeChangedToLastStateAndHADisabledPreviously(
+ FlinkDeployment flinkApp) {
+ final FlinkDeploymentSpec lastReconciledSpec =
+ Preconditions.checkNotNull(
+
flinkApp.getStatus().getReconciliationStatus().getLastReconciledSpec());
+ final UpgradeMode previousUpgradeMode =
lastReconciledSpec.getJob().getUpgradeMode();
+ final UpgradeMode currentUpgradeMode =
flinkApp.getSpec().getJob().getUpgradeMode();
+
+ final Configuration lastReconciledFlinkConfig =
+
Configuration.fromMap(lastReconciledSpec.getFlinkConfiguration());
+
+ return previousUpgradeMode != UpgradeMode.LAST_STATE
+ && currentUpgradeMode == UpgradeMode.LAST_STATE
+ &&
!HighAvailabilityMode.isHighAvailabilityModeActivated(lastReconciledFlinkConfig);
+ }
+
private static boolean isJobUpgradeInProgress(FlinkDeployment current) {
ReconciliationStatus reconciliationStatus =
current.getStatus().getReconciliationStatus();
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
index 297ede0..9228454 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultDeploymentValidator.java
@@ -32,6 +32,8 @@ import org.apache.flink.kubernetes.operator.crd.spec.Resource;
import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
+import
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
@@ -251,6 +253,19 @@ public class DefaultDeploymentValidator implements
FlinkDeploymentValidator {
== null)) {
return Optional.of("Cannot perform savepoint restore without a
valid savepoint");
}
+
+ if (StringUtils.isNullOrWhitespaceOnly(
+ newSpec.getFlinkConfiguration()
+
.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key()))
+ && deployment.getStatus().getJobManagerDeploymentStatus()
+ != JobManagerDeploymentStatus.MISSING
+ &&
ReconciliationUtils.isUpgradeModeChangedToLastStateAndHADisabledPreviously(
+ deployment)) {
+ return Optional.of(
+ String.format(
+ "Job could not be upgraded to last-state while
config key[%s] is not set",
+
CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
+ }
}
return Optional.empty();
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
index d201961..8747056 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/JobReconcilerTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.kubernetes.operator.reconciler;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
@@ -38,6 +39,7 @@ import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -209,14 +211,111 @@ public class JobReconcilerTest {
assertNull(spDeployment.getStatus().getJobStatus().getSavepointInfo().getTriggerId());
}
- private void verifyAndSetRunningJobsToStatus(
- FlinkDeployment deployment, List<Tuple2<String, JobStatusMessage>>
runningJobs) {
- assertEquals(1, runningJobs.size());
- assertNull(runningJobs.get(0).f0);
+ @Test
+ public void
testUpgradeModeChangedToLastStateShouldTriggerSavepointWhileHADisabled()
+ throws Exception {
+ final Context context =
TestUtils.createContextWithReadyJobManagerDeployment();
+ final TestingFlinkService flinkService = new TestingFlinkService();
+
+ final JobReconciler reconciler =
+ new JobReconciler(null, flinkService, operatorConfiguration);
+ final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+ final Configuration config = FlinkUtils.getEffectiveConfig(deployment,
new Configuration());
+
deployment.getSpec().getFlinkConfiguration().remove(HighAvailabilityOptions.HA_MODE.key());
+ config.removeConfig(HighAvailabilityOptions.HA_MODE);
+
+ reconciler.reconcile(deployment, context, config);
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+
.setLastReconciledSpec(ReconciliationUtils.clone(deployment.getSpec()));
+
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYING,
+ deployment.getStatus().getJobManagerDeploymentStatus());
+
+ // Not ready for spec changes, the reconciliation is not performed
+ final String newImage = "new-image-1";
+ deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+ deployment.getSpec().setImage(newImage);
+ reconciler.reconcile(deployment, context, config);
+ reconciler.reconcile(deployment, context, config);
+ assertNull(config.get(SavepointConfigOptions.SAVEPOINT_PATH));
+ assertNull(flinkService.listJobs().get(0).f0);
+ assertNotEquals(
+ newImage,
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .getLastReconciledSpec()
+ .getImage());
+
+ // Ready for spec changes, the reconciliation should be performed
+ verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+ reconciler.reconcile(deployment, context, config);
+ reconciler.reconcile(deployment, context, config);
+ assertEquals(
+ newImage,
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .getLastReconciledSpec()
+ .getImage());
+ // Upgrade mode changes from stateless to last-state should trigger a
savepoint
+ final String expectedSavepointPath = "savepoint_0";
+ assertEquals(expectedSavepointPath,
config.get(SavepointConfigOptions.SAVEPOINT_PATH));
+ final List<Tuple2<String, JobStatusMessage>> runningJobs =
flinkService.listJobs();
+ assertEquals(expectedSavepointPath, runningJobs.get(0).f0);
+ }
+
+ @Test
+ public void
testUpgradeModeChangedToLastStateShouldNotTriggerSavepointWhileHAEnabled()
+ throws Exception {
+ final Context context =
TestUtils.createContextWithReadyJobManagerDeployment();
+ final TestingFlinkService flinkService = new TestingFlinkService();
+
+ final JobReconciler reconciler =
+ new JobReconciler(null, flinkService, operatorConfiguration);
+ final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+ final Configuration config = FlinkUtils.getEffectiveConfig(deployment,
new Configuration());
+
+ reconciler.reconcile(deployment, context, config);
deployment
.getStatus()
.getReconciliationStatus()
.setLastReconciledSpec(ReconciliationUtils.clone(deployment.getSpec()));
+ assertNotEquals(
+ UpgradeMode.LAST_STATE,
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .getLastReconciledSpec()
+ .getJob()
+ .getUpgradeMode());
+
+ final String newImage = "new-image-1";
+ deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+ deployment.getSpec().setImage(newImage);
+ verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+ reconciler.reconcile(deployment, context, config);
+ reconciler.reconcile(deployment, context, config);
+ assertEquals(
+ newImage,
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .getLastReconciledSpec()
+ .getImage());
+ // Upgrade mode changes from stateless to last-state while HA enabled
previously should not
+ // trigger a savepoint
+ assertNull(config.get(SavepointConfigOptions.SAVEPOINT_PATH));
+ assertNull(flinkService.listJobs().get(0).f0);
+ }
+
+ private void verifyAndSetRunningJobsToStatus(
+ FlinkDeployment deployment, List<Tuple2<String, JobStatusMessage>>
runningJobs) {
+ assertEquals(1, runningJobs.size());
+ assertNull(runningJobs.get(0).f0);
JobStatus jobStatus = new JobStatus();
jobStatus.setJobName(runningJobs.get(0).f1.getJobName());
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
index 9c71671..accce51 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DeploymentValidatorTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.kubernetes.operator.validation;
import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
@@ -29,6 +30,7 @@ import
org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
+import
org.apache.flink.kubernetes.operator.observer.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.utils.Constants;
@@ -245,6 +247,33 @@ public class DeploymentValidatorTest {
},
"Cannot switch from session to job cluster");
+ // Test upgrade mode change validation
+ testError(
+ dep -> {
+
dep.getSpec().getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+ dep.setStatus(new FlinkDeploymentStatus());
+ dep.getStatus().setJobStatus(new JobStatus());
+
+ dep.getStatus().setReconciliationStatus(new
ReconciliationStatus());
+ dep.getStatus()
+ .getReconciliationStatus()
+
.setLastReconciledSpec(ReconciliationUtils.clone(dep.getSpec()));
+ dep.getStatus()
+ .getReconciliationStatus()
+ .getLastReconciledSpec()
+ .getJob()
+ .setUpgradeMode(UpgradeMode.STATELESS);
+ dep.getStatus()
+ .getReconciliationStatus()
+ .getLastReconciledSpec()
+ .getFlinkConfiguration()
+ .remove(HighAvailabilityOptions.HA_MODE.key());
+
dep.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+ },
+ String.format(
+ "Job could not be upgraded to last-state while config
key[%s] is not set",
+ CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
+
testError(dep -> dep.getSpec().setFlinkVersion(null), "Flink Version
must be defined.");
testSuccess(dep -> dep.getSpec().setFlinkVersion(FlinkVersion.v1_15));