This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 662fa612 [FLINK-33107] Use correct upgrade mode when executing
rollback, simplify rollback flow
662fa612 is described below
commit 662fa612a8ab352e43ab8a99fa61aadfbe41e4d7
Author: Gyula Fora <[email protected]>
AuthorDate: Mon Sep 25 09:12:20 2023 +0200
[FLINK-33107] Use correct upgrade mode when executing rollback, simplify
rollback flow
---
.../operator/autoscaler/JobAutoScalerImpl.java | 2 +-
.../operator/reconciler/ReconciliationUtils.java | 78 ++++++++------
.../AbstractFlinkResourceReconciler.java | 116 ++++++++++-----------
.../deployment/AbstractJobReconciler.java | 5 +-
.../deployment/ApplicationReconciler.java | 10 +-
.../reconciler/deployment/SessionReconciler.java | 4 +-
.../operator/controller/RollbackTest.java | 7 +-
.../deployment/ApplicationReconcilerTest.java | 81 ++++++++++++++
8 files changed, 199 insertions(+), 104 deletions(-)
diff --git
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
index b3013a0a..402f88bf 100644
---
a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
+++
b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScalerImpl.java
@@ -171,7 +171,7 @@ public class JobAutoScalerImpl implements JobAutoScaler {
var status = resource.getStatus();
if (status.getLifecycleState() != ResourceLifecycleState.STABLE
||
!status.getJobStatus().getState().equals(JobStatus.RUNNING.name())) {
- LOG.info("Autoscaler is waiting for RUNNING job state");
+ LOG.info("Autoscaler is waiting for stable, running state");
lastEvaluatedMetrics.remove(resourceId);
return;
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 5bf120bf..a4c97bec 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -125,47 +125,59 @@ public class ReconciliationUtils {
// Clear errors
status.setError(null);
reconciliationStatus.setReconciliationTimestamp(clock.instant().toEpochMilli());
- ReconciliationState state;
- if (status.getReconciliationStatus().getState() ==
ReconciliationState.ROLLING_BACK) {
+
+ var state = reconciliationStatus.getState();
+ if (state == ReconciliationState.ROLLING_BACK) {
state = upgrading ? ReconciliationState.ROLLING_BACK :
ReconciliationState.ROLLED_BACK;
} else {
state = upgrading ? ReconciliationState.UPGRADING :
ReconciliationState.DEPLOYED;
}
reconciliationStatus.setState(state);
- SPEC clonedSpec;
- if (status.getReconciliationStatus().getState() ==
ReconciliationState.ROLLING_BACK
- || status.getReconciliationStatus().getState() ==
ReconciliationState.ROLLED_BACK) {
- clonedSpec = reconciliationStatus.deserializeLastReconciledSpec();
- } else {
- clonedSpec = ReconciliationUtils.clone(spec);
- }
- if (spec.getJob() != null) {
- // For jobs we have to adjust the reconciled spec
- var job = clonedSpec.getJob();
- job.setState(stateAfterReconcile);
-
- var lastSpec =
reconciliationStatus.deserializeLastReconciledSpec();
- if (lastSpec != null) {
- // We preserve the last snapshot triggers to not lose new
triggers during upgrade
-
job.setSavepointTriggerNonce(lastSpec.getJob().getSavepointTriggerNonce());
-
job.setCheckpointTriggerNonce(lastSpec.getJob().getCheckpointTriggerNonce());
- }
-
- if (target instanceof FlinkDeployment) {
- // For application deployments we update the taskmanager info
- ((FlinkDeploymentStatus) status)
- .setTaskManager(
- getTaskManagerInfo(
- target.getMetadata().getName(), conf,
stateAfterReconcile));
- }
- reconciliationStatus.serializeAndSetLastReconciledSpec(clonedSpec,
target);
- if (spec.getJob().getState() == JobState.SUSPENDED) {
- // When a job is suspended by the user it is automatically
marked stable
- reconciliationStatus.markReconciledSpecAsStable();
+ if (state == ReconciliationState.ROLLING_BACK || state ==
ReconciliationState.ROLLED_BACK) {
+ var lastSpecWithMeta =
reconciliationStatus.deserializeLastReconciledSpecWithMeta();
+ var job = lastSpecWithMeta.getSpec().getJob();
+ if (job != null) {
+ // During the rollback we have to update the upgradeMode in
the lastReconciledSpec
+ // based on the rollback upgradeMode, this ensures that the
next upgrade can be
+ // executed correctly and we don't accidentally lose state.
+ job.setUpgradeMode(spec.getJob().getUpgradeMode());
+ reconciliationStatus.setLastReconciledSpec(
+ SpecUtils.writeSpecWithMeta(
+ lastSpecWithMeta.getSpec(),
lastSpecWithMeta.getMeta()));
}
} else {
- reconciliationStatus.serializeAndSetLastReconciledSpec(clonedSpec,
target);
+ SPEC clonedSpec = ReconciliationUtils.clone(spec);
+ if (spec.getJob() != null) {
+ // For jobs we have to adjust the reconciled spec
+ var job = clonedSpec.getJob();
+ job.setState(stateAfterReconcile);
+
+ var lastSpec =
reconciliationStatus.deserializeLastReconciledSpec();
+ if (lastSpec != null) {
+ // We preserve the last snapshot triggers to not lose new
triggers during
+ // upgrade
+
job.setSavepointTriggerNonce(lastSpec.getJob().getSavepointTriggerNonce());
+
job.setCheckpointTriggerNonce(lastSpec.getJob().getCheckpointTriggerNonce());
+ }
+
+ if (target instanceof FlinkDeployment) {
+ // For application deployments we update the taskmanager
info
+ ((FlinkDeploymentStatus) status)
+ .setTaskManager(
+ getTaskManagerInfo(
+ target.getMetadata().getName(),
+ conf,
+ stateAfterReconcile));
+ }
+
reconciliationStatus.serializeAndSetLastReconciledSpec(clonedSpec, target);
+ if (spec.getJob().getState() == JobState.SUSPENDED) {
+ // When a job is suspended by the user it is automatically
marked stable
+ reconciliationStatus.markReconciledSpecAsStable();
+ }
+ } else {
+
reconciliationStatus.serializeAndSetLastReconciledSpec(clonedSpec, target);
+ }
}
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 91edcf70..ffe636dc 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -47,7 +47,6 @@ import
org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
-import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -134,6 +133,7 @@ public abstract class AbstractFlinkResourceReconciler<
SPEC currentDeploySpec = cr.getSpec();
resourceScaler.scale(ctx);
+ var reconciliationState = reconciliationStatus.getState();
var specDiff =
new ReflectiveDiffBuilder<>(
ctx.getDeploymentMode(), lastReconciledSpec,
currentDeploySpec)
@@ -141,14 +141,14 @@ public abstract class AbstractFlinkResourceReconciler<
var diffType = specDiff.getType();
boolean specChanged =
- DiffType.IGNORE != diffType
- || reconciliationStatus.getState() ==
ReconciliationState.UPGRADING;
+ DiffType.IGNORE != diffType || reconciliationState ==
ReconciliationState.UPGRADING;
- if (reconciliationStatus.getState() ==
ReconciliationState.ROLLING_BACK) {
- specChanged = prepareCrForRollback(ctx, currentDeploySpec,
lastReconciledSpec);
+ if (shouldRollBack(ctx, specChanged, lastReconciledSpec)) {
+ prepareCrForRollback(ctx, specChanged, lastReconciledSpec);
+ specChanged = true;
+ diffType = DiffType.UPGRADE;
}
- var observeConfig = ctx.getObserveConfig();
if (specChanged) {
var deployConfig = ctx.getDeployConfig(cr.getSpec());
if (checkNewSpecAlreadyDeployed(cr, deployConfig)) {
@@ -160,7 +160,7 @@ public abstract class AbstractFlinkResourceReconciler<
boolean scaled = diffType != DiffType.UPGRADE && scale(ctx,
deployConfig);
// Reconcile spec change unless scaling was enough
- if (scaled || reconcileSpecChange(ctx, deployConfig)) {
+ if (scaled || reconcileSpecChange(ctx, deployConfig,
lastReconciledSpec)) {
// If we executed a scale or spec upgrade action we return,
otherwise we
// continue to reconcile other changes
return;
@@ -169,20 +169,7 @@ public abstract class AbstractFlinkResourceReconciler<
ReconciliationUtils.updateReconciliationMetadata(cr);
}
- if (shouldRollBack(ctx, observeConfig)) {
- // Rollbacks are executed in two steps, we initiate it first then
return
- if (initiateRollBack(status)) {
- return;
- }
- LOG.warn(MSG_ROLLBACK);
- eventRecorder.triggerEvent(
- cr,
- EventRecorder.Type.Normal,
- EventRecorder.Reason.Rollback,
- EventRecorder.Component.JobManagerDeployment,
- MSG_ROLLBACK,
- ctx.getKubernetesClient());
- } else if (!reconcileOtherChanges(ctx)) {
+ if (!reconcileOtherChanges(ctx)) {
LOG.info("Resource fully reconciled, nothing to do...");
}
}
@@ -243,11 +230,13 @@ public abstract class AbstractFlinkResourceReconciler<
*
* @param ctx Reconciliation context.
* @param deployConfig Deployment configuration.
+ * @param lastReconciledSpec Last reconciled spec
* @throws Exception Error during spec upgrade.
* @return True if spec change reconciliation was executed
*/
protected abstract boolean reconcileSpecChange(
- FlinkResourceContext<CR> ctx, Configuration deployConfig) throws
Exception;
+ FlinkResourceContext<CR> ctx, Configuration deployConfig, SPEC
lastReconciledSpec)
+ throws Exception;
/**
* Reconcile any other changes required for this resource that are
specific to the reconciler
@@ -347,17 +336,24 @@ public abstract class AbstractFlinkResourceReconciler<
* <p>Rollbacks are only supported to previously running resource specs
with HA enabled.
*
* @param ctx Reconciliation context.
- * @param configuration Flink cluster configuration.
+ * @param specChanged Flag indicating whether the spec changed
* @return True if the resource should be rolled back.
*/
- private boolean shouldRollBack(FlinkResourceContext<CR> ctx, Configuration
configuration) {
+ private boolean shouldRollBack(
+ FlinkResourceContext<CR> ctx, boolean specChanged, SPEC
lastReconciledSpec) {
var resource = ctx.getResource();
var reconciliationStatus =
resource.getStatus().getReconciliationStatus();
+ var configuration = ctx.getObserveConfig();
+
if (reconciliationStatus.getState() ==
ReconciliationState.ROLLING_BACK) {
return true;
}
+ if (specChanged) {
+ return false;
+ }
+
if
(!configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_ROLLBACK_ENABLED)
|| reconciliationStatus.getState() ==
ReconciliationState.ROLLED_BACK
|| reconciliationStatus.isLastReconciledSpecStable()) {
@@ -389,8 +385,8 @@ public abstract class AbstractFlinkResourceReconciler<
return false;
}
- if (resource.getSpec().getJob() != null
- && resource.getSpec().getJob().getUpgradeMode() ==
UpgradeMode.SAVEPOINT
+ if (lastReconciledSpec.getJob() != null
+ && lastReconciledSpec.getJob().getUpgradeMode() ==
UpgradeMode.SAVEPOINT
&& FlinkUtils.jmPodNeverStarted(ctx.getJosdkContext())) {
// HA data not available as JM never start and relying on
SAVEPOINT upgrade mode
// Safe to rollback relying on savepoint
@@ -404,47 +400,49 @@ public abstract class AbstractFlinkResourceReconciler<
return haDataAvailable;
}
- /**
- * Initiate rollback process by changing the {@link ReconciliationState}
in the status.
- *
- * @param status Resource status.
- * @return True if a new rollback was initiated.
- */
- private boolean initiateRollBack(STATUS status) {
+ private void prepareCrForRollback(
+ FlinkResourceContext<CR> ctx, boolean specChanged, SPEC
lastReconciledSpec) {
+ var cr = ctx.getResource();
+ var status = cr.getStatus();
var reconciliationStatus = status.getReconciliationStatus();
+
if (reconciliationStatus.getState() !=
ReconciliationState.ROLLING_BACK) {
- LOG.warn("Preparing to roll back to last stable spec.");
- if (StringUtils.isEmpty(status.getError())) {
- status.setError(
- "Deployment is not ready within the configured
timeout, rolling back.");
- }
+ // When we initiate rollback we trigger a one time event
reconciliationStatus.setState(ReconciliationState.ROLLING_BACK);
- return true;
+ LOG.warn(MSG_ROLLBACK);
+ eventRecorder.triggerEvent(
+ ctx.getResource(),
+ EventRecorder.Type.Normal,
+ EventRecorder.Reason.Rollback,
+ EventRecorder.Component.JobManagerDeployment,
+ MSG_ROLLBACK,
+ ctx.getKubernetesClient());
+ } else {
+ if (lastReconciledSpec.getJob() != null) {
+ // The rollback SUSPENDED status is not recorded anywhere
currently. Since the
+ // reconciler looks at the lastReconciled spec state to decide
on the next action
+ // (cancel vs deploy) this is a simple trick to make the
rollback flow work
+ // correctly.
+ lastReconciledSpec.getJob().setState(JobState.SUSPENDED);
+ }
}
- return false;
- }
- private boolean prepareCrForRollback(
- FlinkResourceContext<CR> ctx, SPEC currentDeploySpec, SPEC
lastReconciledSpec) {
- var cr = ctx.getResource();
- var reconciliationStatus = cr.getStatus().getReconciliationStatus();
- // Spec has changed while rolling back we should apply new spec and
move to upgrading
- // state. Don't take in account changes on job.state as it could be
overriden to running if
- // the current spec is not valid
- if (lastReconciledSpec.getJob() != null) {
-
lastReconciledSpec.getJob().setState(currentDeploySpec.getJob().getState());
- }
- var specDiffRollingBack =
- new ReflectiveDiffBuilder<>(
- ctx.getDeploymentMode(), lastReconciledSpec,
currentDeploySpec)
- .build();
- if (DiffType.IGNORE != specDiffRollingBack.getType()) {
+ if (specChanged) {
+ // If spec has changed while rolling back we should apply new spec
and move to upgrading
+ // state to break out of the rollback flow.
reconciliationStatus.setState(ReconciliationState.UPGRADING);
} else {
- // Rely on the last stable spec if rolling back and no change in
the spec
-
cr.setSpec(cr.getStatus().getReconciliationStatus().deserializeLastStableSpec());
+ cr.setSpec(reconciliationStatus.deserializeLastStableSpec());
+ var job = cr.getSpec().getJob();
+ if (job != null) {
+ // The last stable spec may have a completely different
upgrade mode, then what we
+ // used the last time. We set it based on the
lastReconciledSpec
+ job.setUpgradeMode(
+ lastReconciledSpec.getJob().getUpgradeMode() ==
UpgradeMode.STATELESS
+ ? UpgradeMode.STATELESS
+ : UpgradeMode.LAST_STATE);
+ }
}
- return true;
}
/**
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 6db4905c..c12367ff 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -85,13 +85,12 @@ public abstract class AbstractJobReconciler<
}
@Override
- protected boolean reconcileSpecChange(FlinkResourceContext<CR> ctx,
Configuration deployConfig)
+ protected boolean reconcileSpecChange(
+ FlinkResourceContext<CR> ctx, Configuration deployConfig, SPEC
lastReconciledSpec)
throws Exception {
var resource = ctx.getResource();
STATUS status = resource.getStatus();
- var reconciliationStatus = status.getReconciliationStatus();
- SPEC lastReconciledSpec =
reconciliationStatus.deserializeLastReconciledSpec();
SPEC currentDeploySpec = resource.getSpec();
JobState currentJobState = lastReconciledSpec.getJob().getState();
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index cc5594c2..f3719fd6 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -85,9 +85,13 @@ public class ApplicationReconciler
}
var flinkService = ctx.getFlinkService();
- if (deployConfig.getBoolean(
- KubernetesOperatorConfigOptions
-
.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED)
+ boolean lastStateAllowed =
+ deployment.getSpec().getJob().getUpgradeMode() ==
UpgradeMode.LAST_STATE
+ || deployConfig.getBoolean(
+ KubernetesOperatorConfigOptions
+
.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED);
+
+ if (lastStateAllowed
&&
HighAvailabilityMode.isHighAvailabilityModeActivated(deployConfig)
&&
HighAvailabilityMode.isHighAvailabilityModeActivated(ctx.getObserveConfig())
&& !flinkVersionChanged(
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index 1f6d2cbd..335f6259 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -60,7 +60,9 @@ public class SessionReconciler
@Override
protected boolean reconcileSpecChange(
- FlinkResourceContext<FlinkDeployment> ctx, Configuration
deployConfig)
+ FlinkResourceContext<FlinkDeployment> ctx,
+ Configuration deployConfig,
+ FlinkDeploymentSpec lastReconciledSpec)
throws Exception {
var deployment = ctx.getResource();
deleteSessionCluster(ctx);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
index 667a13c8..f6ca19ae 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/RollbackTest.java
@@ -374,11 +374,10 @@ public class RollbackTest {
assertFalse(deployment.getStatus().getReconciliationStatus().isLastReconciledSpecStable());
assertEquals(
- ReconciliationState.ROLLING_BACK,
+ deployment.getSpec().getJob() != null
+ ? ReconciliationState.ROLLING_BACK
+ : ReconciliationState.ROLLED_BACK,
deployment.getStatus().getReconciliationStatus().getState());
- assertEquals(
- "Deployment is not ready within the configured timeout,
rolling back.",
- deployment.getStatus().getError());
if (injectValidationError) {
deployment.getSpec().setLogConfiguration(Map.of("invalid",
"entry"));
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 504bf671..bc5c267b 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -136,6 +136,8 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
private FlinkOperatorConfiguration operatorConfig;
private ExecutorService executorService;
+ private Clock testClock = Clock.systemDefaultZone();
+
@Override
public void setup() {
appReconciler =
@@ -1159,4 +1161,83 @@ public class ApplicationReconcilerTest extends
OperatorTestBase {
.getMetadata()
.getGeneration());
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testRollbackUpgradeModeHandling(boolean jmStarted) throws
Exception {
+ var deployment = TestUtils.buildApplicationCluster();
+ deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
+ offsetReconcilerClock(deployment, Duration.ZERO);
+
+ var flinkConfiguration = deployment.getSpec().getFlinkConfiguration();
+ flinkConfiguration.put(
+
KubernetesOperatorConfigOptions.DEPLOYMENT_ROLLBACK_ENABLED.key(), "true");
+ flinkConfiguration.put(
+
KubernetesOperatorConfigOptions.DEPLOYMENT_READINESS_TIMEOUT.key(), "10s");
+ flinkConfiguration.put(
+
KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED
+ .key(),
+ "false");
+
+ // Initial deployment, mark as stable
+ reconciler.reconcile(deployment, context);
+ verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+
deployment.getStatus().getReconciliationStatus().markReconciledSpecAsStable();
+
+ // Submit invalid change
+ deployment.getSpec().getJob().setParallelism(9999);
+ reconciler.reconcile(deployment, context);
+ reconciler.reconcile(deployment, context);
+ assertEquals(1, flinkService.listJobs().size());
+ assertEquals(
+ UpgradeMode.STATELESS,
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .deserializeLastStableSpec()
+ .getJob()
+ .getUpgradeMode());
+ assertEquals(
+ UpgradeMode.SAVEPOINT,
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .deserializeLastReconciledSpec()
+ .getJob()
+ .getUpgradeMode());
+
+ // Trigger rollback by delaying the recovery
+ offsetReconcilerClock(deployment, Duration.ofSeconds(15));
+ flinkService.setHaDataAvailable(jmStarted);
+ flinkService.setJobManagerReady(jmStarted);
+ reconciler.reconcile(deployment, context);
+
+ assertEquals(
+ ReconciliationState.ROLLING_BACK,
+ deployment.getStatus().getReconciliationStatus().getState());
+ assertEquals(0, flinkService.listJobs().size());
+ assertEquals("FINISHED",
deployment.getStatus().getJobStatus().getState());
+ assertEquals(
+ jmStarted ? UpgradeMode.LAST_STATE : UpgradeMode.SAVEPOINT,
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .deserializeLastReconciledSpec()
+ .getJob()
+ .getUpgradeMode());
+
+ flinkService.setJobManagerReady(true);
+ reconciler.reconcile(deployment, context);
+
+ assertEquals(
+ ReconciliationState.ROLLED_BACK,
+ deployment.getStatus().getReconciliationStatus().getState());
+ assertEquals(1, flinkService.listJobs().size());
+ assertEquals("RECONCILING",
deployment.getStatus().getJobStatus().getState());
+ }
+
+ private void offsetReconcilerClock(FlinkDeployment dep, Duration offset) {
+ testClock = Clock.offset(testClock, offset);
+ appReconciler.setClock(testClock);
+ }
}