This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 0b14606e [FLINK-38867] FlinkBlueGreenDeployment used
initialSavepointPath on restartSavepointNonce
0b14606e is described below
commit 0b14606e15fd6182d582029cf7427a7ed40e86fd
Author: James Kan <[email protected]>
AuthorDate: Thu Dec 11 12:15:22 2025 -0800
[FLINK-38867] FlinkBlueGreenDeployment used initialSavepointPath on
restartSavepointNonce
Co-authored-by: Daniel Rossos <[email protected]>
---
.../operator/api/bluegreen/BlueGreenDiffType.java | 8 +-
.../bluegreen/BlueGreenDeploymentService.java | 57 ++++++++-
.../diff/FlinkBlueGreenDeploymentSpecDiff.java | 21 ++-
.../operator/utils/bluegreen/BlueGreenUtils.java | 17 ++-
.../FlinkBlueGreenDeploymentControllerTest.java | 62 +++++++++
.../diff/FlinkBlueGreenDeploymentSpecDiffTest.java | 57 ++++++++-
.../utils/bluegreen/BlueGreenUtilsTest.java | 141 +++++++++++++++++++++
7 files changed, 347 insertions(+), 16 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/BlueGreenDiffType.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/BlueGreenDiffType.java
index 5581493f..e456a891 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/BlueGreenDiffType.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/BlueGreenDiffType.java
@@ -26,5 +26,11 @@ public enum BlueGreenDiffType {
TRANSITION,
/** Changes that only affect the child FlinkDeploymentSpec. */
- PATCH_CHILD
+ PATCH_CHILD,
+
+ /**
+ * Full redeploy from user-specified savepoint. Triggered when
savepointRedeployNonce changes.
+ * Uses the initialSavepointPath from spec instead of taking a new
savepoint.
+ */
+ SAVEPOINT_REDEPLOY
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java
index 85de365b..132b05e2 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java
@@ -42,6 +42,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
+import java.util.Objects;
import static
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenKubernetesService.deleteFlinkDeployment;
import static
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenKubernetesService.deployCluster;
@@ -143,6 +144,25 @@ public class BlueGreenDeploymentService {
context.getDeploymentStatus().setSavepointTriggerId(null);
return markDeploymentFailing(context, error);
}
+
+ } else if (specDiff == BlueGreenDiffType.SAVEPOINT_REDEPLOY) {
+ // Savepoint redeploy: skip taking a new savepoint, use
initialSavepointPath
+ var jobSpec =
+
context.getBgDeployment().getSpec().getTemplate().getSpec().getJob();
+ LOG.info(
+ "Savepoint redeploy triggered for '{}', using
initialSavepointPath: {}",
+ context.getBgDeployment().getMetadata().getName(),
+
Objects.toString(jobSpec.getInitialSavepointPath(), "<none>"));
+ setLastReconciledSpec(context);
+ try {
+ return startSavepointRedeployTransition(
+ context, currentBlueGreenDeploymentType);
+ } catch (Exception e) {
+ var error =
+ "Could not start Savepoint Redeploy
Transition. Details: "
+ + e.getMessage();
+ return markDeploymentFailing(context, error);
+ }
} else {
setLastReconciledSpec(context);
LOG.info(
@@ -170,6 +190,13 @@ public class BlueGreenDeploymentService {
private UpdateControl<FlinkBlueGreenDeployment> patchFlinkDeployment(
BlueGreenContext context, BlueGreenDeploymentType
blueGreenDeploymentTypeToPatch) {
+ return patchFlinkDeployment(context, blueGreenDeploymentTypeToPatch,
true);
+ }
+
+ private UpdateControl<FlinkBlueGreenDeployment> patchFlinkDeployment(
+ BlueGreenContext context,
+ BlueGreenDeploymentType blueGreenDeploymentTypeToPatch,
+ boolean carryOverSavepointInPatch) {
String childDeploymentName =
context.getBgDeployment().getMetadata().getName()
@@ -188,7 +215,10 @@ public class BlueGreenDeploymentService {
// will it be used by this patching? otherwise this is unnecessary,
keep lastSavepoint =
// null.
Savepoint lastSavepoint =
- carryOverSavepoint(context, blueGreenDeploymentTypeToPatch,
childDeploymentName);
+ carryOverSavepointInPatch
+ ? carryOverSavepoint(
+ context, blueGreenDeploymentTypeToPatch,
childDeploymentName)
+ : null;
return initiateDeployment(
context,
@@ -246,6 +276,26 @@ public class BlueGreenDeploymentService {
false);
}
+ /**
+ * Starts a transition for savepoint redeploy scenario. Unlike normal
transitions, this does not
+ * take a new savepoint - it uses the initialSavepointPath specified in
the spec.
+ *
+ * @param context the transition context
+ * @param currentBlueGreenDeploymentType the current deployment type
+ * @return UpdateControl for the deployment
+ */
+ private UpdateControl<FlinkBlueGreenDeployment>
startSavepointRedeployTransition(
+ BlueGreenContext context, BlueGreenDeploymentType
currentBlueGreenDeploymentType) {
+ DeploymentTransition transition =
calculateTransition(currentBlueGreenDeploymentType);
+
+ return initiateDeployment(
+ context,
+ transition.nextBlueGreenDeploymentType,
+ transition.nextState,
+ null, // Use initialSavepointPath from spec
+ false);
+ }
+
private DeploymentTransition calculateTransition(BlueGreenDeploymentType
currentType) {
if (BlueGreenDeploymentType.BLUE == currentType) {
return new DeploymentTransition(
@@ -411,7 +461,10 @@ public class BlueGreenDeploymentService {
context.getDeploymentByType(oppositeDeploymentType)
.getMetadata()
.getName());
- return patchFlinkDeployment(context, oppositeDeploymentType);
+ return patchFlinkDeployment(
+ context,
+ oppositeDeploymentType,
+ diffType != BlueGreenDiffType.SAVEPOINT_REDEPLOY);
}
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiff.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiff.java
index 04c243d4..a43f3168 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiff.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiff.java
@@ -59,24 +59,31 @@ public class FlinkBlueGreenDeploymentSpecDiff {
FlinkDeploymentSpec leftSpec = left.getTemplate().getSpec();
FlinkDeploymentSpec rightSpec = right.getTemplate().getSpec();
- // Used in Case 2 & 3: Delegate to ReflectiveDiffBuilder for nested
spec comparison
+ // Used in Case 2, 3 & 4: Delegate to ReflectiveDiffBuilder for nested
spec comparison
// Calculate diffResult before comparison to apply in-place removal of
ignored fields
DiffResult<FlinkDeploymentSpec> diffResult =
new ReflectiveDiffBuilder<>(deploymentMode, leftSpec,
rightSpec).build();
- // Case 1: FlinkDeploymentSpecs are identical
+ // Extract the diff type from ReflectiveDiffBuilder result
+ DiffType diffType = diffResult.getType();
+
+ // Case 1: Check for savepoint redeploy first (savepointRedeployNonce
changed)
+ // This takes precedence as it indicates user wants to redeploy from
their
+ // initialSavepointPath
+ if (diffType == DiffType.SAVEPOINT_REDEPLOY) {
+ return BlueGreenDiffType.SAVEPOINT_REDEPLOY;
+ }
+
+ // Case 2: FlinkDeploymentSpecs are identical
if (leftSpec.equals(rightSpec)) {
return BlueGreenDiffType.IGNORE;
}
- // Extract the diff type from ReflectiveDiffBuilder result
- DiffType diffType = diffResult.getType();
-
- // Case 2: ReflectiveDiffBuilder returns IGNORE
+ // Case 3: ReflectiveDiffBuilder returns IGNORE
if (diffType == DiffType.IGNORE) {
return BlueGreenDiffType.PATCH_CHILD;
} else {
- // Case 3: ReflectiveDiffBuilder returns anything else map it to
TRANSITION as well
+ // Case 4: ReflectiveDiffBuilder returns anything else map it to
TRANSITION
return BlueGreenDiffType.TRANSITION;
}
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java
index 98344a9b..27195908 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java
@@ -332,7 +332,12 @@ public class BlueGreenUtils {
"spec",
FlinkBlueGreenDeploymentSpec.class);
- // The Blue/Green initialSavepointPath is only used for first-time
deployments
+ // Determine which savepoint/checkpoint to restore from:
+ // 1. First-time deployments: use initialSavepointPath from spec (if
set)
+ // 2. Normal transitions: use lastCheckpoint from previous deployment
+ // 3. Redeploy scenarios (lastCheckpoint is null): use
initialSavepointPath from spec
+ // - savepointRedeployNonce changed
+ // - upgradeMode is STATELESS
if (isFirstDeployment) {
String initialSavepointPath =
spec.getTemplate().getSpec().getJob().getInitialSavepointPath();
@@ -346,6 +351,16 @@ public class BlueGreenUtils {
String location = lastCheckpoint.getLocation().replace("file:",
"");
LOG.info("Using Blue/Green savepoint/checkpoint: " + location);
spec.getTemplate().getSpec().getJob().setInitialSavepointPath(location);
+ } else {
+ String initialSavepointPath =
+
spec.getTemplate().getSpec().getJob().getInitialSavepointPath();
+ if (initialSavepointPath != null &&
!initialSavepointPath.isEmpty()) {
+ LOG.info(
+ "Using user-specified initialSavepointPath for
redeploy: {}",
+ initialSavepointPath);
+ } else {
+ LOG.info("Starting fresh with no savepoint restoration");
+ }
}
flinkDeployment.setSpec(spec.getTemplate().getSpec());
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
index 229406bb..8240b8c1 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
@@ -189,6 +189,68 @@ public class FlinkBlueGreenDeploymentControllerTest {
return rs;
}
+ @ParameterizedTest
+
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
+ public void
verifySavepointRedeployNonceTriggersTransitionWithInitialSavepointPath(
+ FlinkVersion flinkVersion) throws Exception {
+ // Start with SAVEPOINT upgrade mode so normally a savepoint would be
taken
+ var blueGreenDeployment =
+ buildSessionCluster(
+ TEST_DEPLOYMENT_NAME,
+ TEST_NAMESPACE,
+ flinkVersion,
+ null,
+ UpgradeMode.SAVEPOINT);
+ var rs = executeBasicDeployment(flinkVersion, blueGreenDeployment,
false, null);
+
+ // Set initialSavepointPath and bump savepointRedeployNonce
+ String userSpecifiedSavepoint = "s3://bucket/my-specific-savepoint";
+ rs.deployment
+ .getSpec()
+ .getTemplate()
+ .getSpec()
+ .getJob()
+ .setInitialSavepointPath(userSpecifiedSavepoint);
+
rs.deployment.getSpec().getTemplate().getSpec().getJob().setSavepointRedeployNonce(12345L);
+ rs.deployment
+ .getSpec()
+ .getConfiguration()
+ .put(DEPLOYMENT_DELETION_DELAY.key(),
String.valueOf(ALT_DELETION_DELAY_VALUE));
+ kubernetesClient.resource(rs.deployment).createOrReplace();
+
+ // Reconcile - should skip savepointing and go directly to transition
+ rs = reconcile(rs.deployment);
+
+ // Verify: Should be TRANSITIONING_TO_GREEN (not SAVEPOINTING_BLUE)
+ assertEquals(
+ FlinkBlueGreenDeploymentState.TRANSITIONING_TO_GREEN,
+ rs.reconciledStatus.getBlueGreenState());
+
+ // Verify: Green deployment should use the user-specified
initialSavepointPath
+ var flinkDeployments = getFlinkDeployments();
+ assertEquals(2, flinkDeployments.size());
+ assertEquals(
+ userSpecifiedSavepoint,
+
flinkDeployments.get(1).getSpec().getJob().getInitialSavepointPath());
+
+ // Complete the transition
+ simulateSuccessfulJobStart(flinkDeployments.get(1));
+ rs = reconcile(rs.deployment);
+
+ // Wait for deletion delay
+ Thread.sleep(rs.updateControl.getScheduleDelay().get());
+ reconcile(rs.deployment);
+
+ // Verify Green is now active
+ flinkDeployments = getFlinkDeployments();
+ assertEquals(1, flinkDeployments.size());
+
+ rs = reconcile(rs.deployment);
+ assertEquals(
+ FlinkBlueGreenDeploymentState.ACTIVE_GREEN,
+ rs.reconciledStatus.getBlueGreenState());
+ }
+
@ParameterizedTest
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
public void verifyFailureBeforeTransition(FlinkVersion flinkVersion)
throws Exception {
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiffTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiffTest.java
index 9fe6b8bd..272730a0 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiffTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiffTest.java
@@ -201,13 +201,28 @@ public class FlinkBlueGreenDeploymentSpecDiffTest {
FlinkBlueGreenDeploymentSpec spec1 = createBasicSpec();
FlinkBlueGreenDeploymentSpec spec2 = createBasicSpec();
- // Change nested spec property - setSavepointRedeployNonce triggers
TRANSITION
+ // Change nested spec property - setSavepointRedeployNonce now
triggers SAVEPOINT_REDEPLOY
spec2.getTemplate().getSpec().getJob().setSavepointRedeployNonce(12345L);
FlinkBlueGreenDeploymentSpecDiff diff =
new FlinkBlueGreenDeploymentSpecDiff(DEPLOYMENT_MODE, spec1,
spec2);
- assertEquals(BlueGreenDiffType.TRANSITION, diff.compare());
+ assertEquals(BlueGreenDiffType.SAVEPOINT_REDEPLOY, diff.compare());
+ }
+
+ @Test
+ public void testSavepointRedeployForNonceChangeWithJarUpdate() {
+ FlinkBlueGreenDeploymentSpec spec1 = createBasicSpec();
+ FlinkBlueGreenDeploymentSpec spec2 = createBasicSpec();
+
+ // Nonce change with additional jarURI change - SAVEPOINT_REDEPLOY
should still be detected
+
spec2.getTemplate().getSpec().getJob().setJarURI("local:///opt/flink/examples/other.jar");
+
spec2.getTemplate().getSpec().getJob().setSavepointRedeployNonce(12345L);
+
+ FlinkBlueGreenDeploymentSpecDiff diff =
+ new FlinkBlueGreenDeploymentSpecDiff(DEPLOYMENT_MODE, spec1,
spec2);
+
+ assertEquals(BlueGreenDiffType.SAVEPOINT_REDEPLOY, diff.compare());
}
@Test
@@ -281,8 +296,8 @@ public class FlinkBlueGreenDeploymentSpecDiffTest {
FlinkBlueGreenDeploymentSpec spec2 = createBasicSpec();
// Change both top-level (configuration) and nested spec
- // With new logic, only nested spec changes matter -
setSavepointRedeployNonce triggers
- // TRANSITION
+ // With new logic, only nested spec changes matter -
setSavepointRedeployNonce now
+ // triggers SAVEPOINT_REDEPLOY
Map<String, String> config = new HashMap<>();
config.put("custom.config", "different-value");
spec2.setConfiguration(config);
@@ -291,7 +306,39 @@ public class FlinkBlueGreenDeploymentSpecDiffTest {
FlinkBlueGreenDeploymentSpecDiff diff =
new FlinkBlueGreenDeploymentSpecDiff(DEPLOYMENT_MODE, spec1,
spec2);
- assertEquals(BlueGreenDiffType.TRANSITION, diff.compare());
+ assertEquals(BlueGreenDiffType.SAVEPOINT_REDEPLOY, diff.compare());
+ }
+
+ @Test
+ public void testSavepointRedeployTakesPrecedenceOverScale() {
+ FlinkBlueGreenDeploymentSpec spec1 = createBasicSpec();
+ FlinkBlueGreenDeploymentSpec spec2 = createBasicSpec();
+
+ // Change both nonce (SAVEPOINT_REDEPLOY) AND parallelism (SCALE)
+ // SAVEPOINT_REDEPLOY should take precedence
+
spec2.getTemplate().getSpec().getJob().setSavepointRedeployNonce(12345L);
+ spec2.getTemplate().getSpec().getJob().setParallelism(10);
+
+ FlinkBlueGreenDeploymentSpecDiff diff =
+ new FlinkBlueGreenDeploymentSpecDiff(DEPLOYMENT_MODE, spec1,
spec2);
+
+ assertEquals(BlueGreenDiffType.SAVEPOINT_REDEPLOY, diff.compare());
+ }
+
+ @Test
+ public void testSavepointRedeployTakesPrecedenceOverUpgrade() {
+ FlinkBlueGreenDeploymentSpec spec1 = createBasicSpec();
+ FlinkBlueGreenDeploymentSpec spec2 = createBasicSpec();
+
+ // Change both nonce (SAVEPOINT_REDEPLOY) AND Flink version (UPGRADE)
+ // SAVEPOINT_REDEPLOY should take precedence
+
spec2.getTemplate().getSpec().getJob().setSavepointRedeployNonce(12345L);
+ spec2.getTemplate().getSpec().setFlinkVersion(FlinkVersion.v1_17);
+
+ FlinkBlueGreenDeploymentSpecDiff diff =
+ new FlinkBlueGreenDeploymentSpecDiff(DEPLOYMENT_MODE, spec1,
spec2);
+
+ assertEquals(BlueGreenDiffType.SAVEPOINT_REDEPLOY, diff.compare());
}
@Test
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java
index 859d19f5..a898662e 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java
@@ -27,6 +27,9 @@ import
org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentTemplateSpec
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
+import org.apache.flink.kubernetes.operator.api.status.Savepoint;
+import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
+import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import org.apache.flink.kubernetes.operator.api.utils.SpecUtils;
import
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenContext;
@@ -38,6 +41,9 @@ import java.util.Map;
import java.util.UUID;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/** Tests for {@link BlueGreenUtils}. */
public class BlueGreenUtilsTest {
@@ -83,6 +89,141 @@ public class BlueGreenUtilsTest {
assertEquals(parentDeploymentName + ".jm",
resultFlinkConfig.get("metrics.scope.jm"));
}
+ @Test
+ public void testSavepointRequiredBasedOnUpgradeMode() {
+ // SAVEPOINT mode requires savepoint
+ FlinkBlueGreenDeployment bgDeployment =
+ buildBlueGreenDeployment("test-app", TEST_NAMESPACE);
+ bgDeployment
+ .getSpec()
+ .getTemplate()
+ .getSpec()
+ .getJob()
+ .setUpgradeMode(UpgradeMode.SAVEPOINT);
+ BlueGreenContext context = createContext(bgDeployment);
+ assertTrue(BlueGreenUtils.isSavepointRequired(context));
+
+ // LAST_STATE mode requires savepoint
+ bgDeployment
+ .getSpec()
+ .getTemplate()
+ .getSpec()
+ .getJob()
+ .setUpgradeMode(UpgradeMode.LAST_STATE);
+ assertTrue(BlueGreenUtils.isSavepointRequired(context));
+
+ // STATELESS mode does not require savepoint
+ bgDeployment
+ .getSpec()
+ .getTemplate()
+ .getSpec()
+ .getJob()
+ .setUpgradeMode(UpgradeMode.STATELESS);
+ assertFalse(BlueGreenUtils.isSavepointRequired(context));
+ }
+
+ @Test
+ public void testPrepareFlinkDeploymentStatelessInitialSavepointPath() {
+ // Setup: STATELESS mode with initialSavepointPath set
+ FlinkBlueGreenDeployment bgDeployment =
+ buildBlueGreenDeployment("test-app", TEST_NAMESPACE);
+ bgDeployment
+ .getSpec()
+ .getTemplate()
+ .getSpec()
+ .getJob()
+ .setUpgradeMode(UpgradeMode.STATELESS);
+ bgDeployment
+ .getSpec()
+ .getTemplate()
+ .getSpec()
+ .getJob()
+ .setInitialSavepointPath("s3://bucket/savepoint-xyz");
+
+ BlueGreenContext context = createContext(bgDeployment);
+
+ // Act: Prepare deployment with null lastCheckpoint (STATELESS
transition)
+ FlinkDeployment result =
+ BlueGreenUtils.prepareFlinkDeployment(
+ context,
+ BlueGreenDeploymentType.GREEN,
+ null, // No lastCheckpoint
+ false, // Not first deployment
+ bgDeployment.getMetadata());
+
+ // Assert: initialSavepointPath should be used for STATELESS
+ assertNotNull(result.getSpec().getJob().getInitialSavepointPath());
+ }
+
+ @Test
+ public void testNullLastCheckpointUsesInitialSavepointPath() {
+ // lastCheckpoint=null -> use initialSavepointPath from spec
+ FlinkBlueGreenDeployment bgDeployment =
+ buildBlueGreenDeployment("test-app", TEST_NAMESPACE);
+ bgDeployment
+ .getSpec()
+ .getTemplate()
+ .getSpec()
+ .getJob()
+ .setUpgradeMode(UpgradeMode.SAVEPOINT);
+ String initialPath = "s3://bucket/user-specified-savepoint";
+ bgDeployment
+ .getSpec()
+ .getTemplate()
+ .getSpec()
+ .getJob()
+ .setInitialSavepointPath(initialPath);
+
+ BlueGreenContext context = createContext(bgDeployment);
+
+ FlinkDeployment result =
+ BlueGreenUtils.prepareFlinkDeployment(
+ context,
+ BlueGreenDeploymentType.GREEN,
+ null, // null = nonce changed, no new savepoint taken
+ false,
+ bgDeployment.getMetadata());
+
+ assertEquals(initialPath,
result.getSpec().getJob().getInitialSavepointPath());
+ }
+
+ @Test
+ public void testNormalTransitionUsesFreshSavepoint() {
+ // Normal transition → take fresh savepoint from running job → use
that, not
+ // initialSavepointPath
+ FlinkBlueGreenDeployment bgDeployment =
+ buildBlueGreenDeployment("test-app", TEST_NAMESPACE);
+ bgDeployment
+ .getSpec()
+ .getTemplate()
+ .getSpec()
+ .getJob()
+ .setUpgradeMode(UpgradeMode.SAVEPOINT);
+ bgDeployment
+ .getSpec()
+ .getTemplate()
+ .getSpec()
+ .getJob()
+ .setInitialSavepointPath("s3://bucket/ignored");
+
+ BlueGreenContext context = createContext(bgDeployment);
+
+ String freshSavepoint = "s3://bucket/fresh-savepoint-from-running-job";
+ Savepoint triggered =
+ Savepoint.of(
+ freshSavepoint, SnapshotTriggerType.UPGRADE,
SavepointFormatType.CANONICAL);
+
+ FlinkDeployment result =
+ BlueGreenUtils.prepareFlinkDeployment(
+ context,
+ BlueGreenDeploymentType.GREEN,
+ triggered, // Fresh savepoint provided
+ false,
+ bgDeployment.getMetadata());
+
+ assertEquals(freshSavepoint,
result.getSpec().getJob().getInitialSavepointPath());
+ }
+
private static FlinkBlueGreenDeployment buildBlueGreenDeployment(
String name, String namespace) {
var deployment = new FlinkBlueGreenDeployment();