This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 0b14606e [FLINK-38867] FlinkBlueGreenDeployment used 
initialSavepointPath on restartSavepointNonce
0b14606e is described below

commit 0b14606e15fd6182d582029cf7427a7ed40e86fd
Author: James Kan <[email protected]>
AuthorDate: Thu Dec 11 12:15:22 2025 -0800

    [FLINK-38867] FlinkBlueGreenDeployment used initialSavepointPath on 
restartSavepointNonce
    
    Co-authored-by: Daniel Rossos <[email protected]>
---
 .../operator/api/bluegreen/BlueGreenDiffType.java  |   8 +-
 .../bluegreen/BlueGreenDeploymentService.java      |  57 ++++++++-
 .../diff/FlinkBlueGreenDeploymentSpecDiff.java     |  21 ++-
 .../operator/utils/bluegreen/BlueGreenUtils.java   |  17 ++-
 .../FlinkBlueGreenDeploymentControllerTest.java    |  62 +++++++++
 .../diff/FlinkBlueGreenDeploymentSpecDiffTest.java |  57 ++++++++-
 .../utils/bluegreen/BlueGreenUtilsTest.java        | 141 +++++++++++++++++++++
 7 files changed, 347 insertions(+), 16 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/BlueGreenDiffType.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/BlueGreenDiffType.java
index 5581493f..e456a891 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/BlueGreenDiffType.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/BlueGreenDiffType.java
@@ -26,5 +26,11 @@ public enum BlueGreenDiffType {
     TRANSITION,
 
     /** Changes that only affect the child FlinkDeploymentSpec. */
-    PATCH_CHILD
+    PATCH_CHILD,
+
+    /**
+     * Full redeploy from user-specified savepoint. Triggered when 
savepointRedeployNonce changes.
+     * Uses the initialSavepointPath from spec instead of taking a new 
savepoint.
+     */
+    SAVEPOINT_REDEPLOY
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java
index 85de365b..132b05e2 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java
@@ -42,6 +42,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.Instant;
+import java.util.Objects;
 
 import static 
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenKubernetesService.deleteFlinkDeployment;
 import static 
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenKubernetesService.deployCluster;
@@ -143,6 +144,25 @@ public class BlueGreenDeploymentService {
                         
context.getDeploymentStatus().setSavepointTriggerId(null);
                         return markDeploymentFailing(context, error);
                     }
+
+                } else if (specDiff == BlueGreenDiffType.SAVEPOINT_REDEPLOY) {
+                    // Savepoint redeploy: skip taking a new savepoint, use 
initialSavepointPath
+                    var jobSpec =
+                            
context.getBgDeployment().getSpec().getTemplate().getSpec().getJob();
+                    LOG.info(
+                            "Savepoint redeploy triggered for '{}', using 
initialSavepointPath: {}",
+                            context.getBgDeployment().getMetadata().getName(),
+                            
Objects.toString(jobSpec.getInitialSavepointPath(), "<none>"));
+                    setLastReconciledSpec(context);
+                    try {
+                        return startSavepointRedeployTransition(
+                                context, currentBlueGreenDeploymentType);
+                    } catch (Exception e) {
+                        var error =
+                                "Could not start Savepoint Redeploy 
Transition. Details: "
+                                        + e.getMessage();
+                        return markDeploymentFailing(context, error);
+                    }
                 } else {
                     setLastReconciledSpec(context);
                     LOG.info(
@@ -170,6 +190,13 @@ public class BlueGreenDeploymentService {
 
     private UpdateControl<FlinkBlueGreenDeployment> patchFlinkDeployment(
             BlueGreenContext context, BlueGreenDeploymentType 
blueGreenDeploymentTypeToPatch) {
+        return patchFlinkDeployment(context, blueGreenDeploymentTypeToPatch, 
true);
+    }
+
+    private UpdateControl<FlinkBlueGreenDeployment> patchFlinkDeployment(
+            BlueGreenContext context,
+            BlueGreenDeploymentType blueGreenDeploymentTypeToPatch,
+            boolean carryOverSavepointInPatch) {
 
         String childDeploymentName =
                 context.getBgDeployment().getMetadata().getName()
@@ -188,7 +215,10 @@ public class BlueGreenDeploymentService {
         //  will it be used by this patching? otherwise this is unnecessary, 
keep lastSavepoint =
         // null.
         Savepoint lastSavepoint =
-                carryOverSavepoint(context, blueGreenDeploymentTypeToPatch, 
childDeploymentName);
+                carryOverSavepointInPatch
+                        ? carryOverSavepoint(
+                                context, blueGreenDeploymentTypeToPatch, 
childDeploymentName)
+                        : null;
 
         return initiateDeployment(
                 context,
@@ -246,6 +276,26 @@ public class BlueGreenDeploymentService {
                 false);
     }
 
+    /**
+     * Starts a transition for savepoint redeploy scenario. Unlike normal 
transitions, this does not
+     * take a new savepoint - it uses the initialSavepointPath specified in 
the spec.
+     *
+     * @param context the transition context
+     * @param currentBlueGreenDeploymentType the current deployment type
+     * @return UpdateControl for the deployment
+     */
+    private UpdateControl<FlinkBlueGreenDeployment> 
startSavepointRedeployTransition(
+            BlueGreenContext context, BlueGreenDeploymentType 
currentBlueGreenDeploymentType) {
+        DeploymentTransition transition = 
calculateTransition(currentBlueGreenDeploymentType);
+
+        return initiateDeployment(
+                context,
+                transition.nextBlueGreenDeploymentType,
+                transition.nextState,
+                null, // Use initialSavepointPath from spec
+                false);
+    }
+
     private DeploymentTransition calculateTransition(BlueGreenDeploymentType 
currentType) {
         if (BlueGreenDeploymentType.BLUE == currentType) {
             return new DeploymentTransition(
@@ -411,7 +461,10 @@ public class BlueGreenDeploymentService {
                         context.getDeploymentByType(oppositeDeploymentType)
                                 .getMetadata()
                                 .getName());
-                return patchFlinkDeployment(context, oppositeDeploymentType);
+                return patchFlinkDeployment(
+                        context,
+                        oppositeDeploymentType,
+                        diffType != BlueGreenDiffType.SAVEPOINT_REDEPLOY);
             }
         }
 
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiff.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiff.java
index 04c243d4..a43f3168 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiff.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiff.java
@@ -59,24 +59,31 @@ public class FlinkBlueGreenDeploymentSpecDiff {
         FlinkDeploymentSpec leftSpec = left.getTemplate().getSpec();
         FlinkDeploymentSpec rightSpec = right.getTemplate().getSpec();
 
-        // Used in Case 2 & 3: Delegate to ReflectiveDiffBuilder for nested 
spec comparison
+        // Used in Case 2, 3 & 4: Delegate to ReflectiveDiffBuilder for nested 
spec comparison
         // Calculate diffResult before comparison to apply in-place removal of 
ignored fields
         DiffResult<FlinkDeploymentSpec> diffResult =
                 new ReflectiveDiffBuilder<>(deploymentMode, leftSpec, 
rightSpec).build();
 
-        // Case 1: FlinkDeploymentSpecs are identical
+        // Extract the diff type from ReflectiveDiffBuilder result
+        DiffType diffType = diffResult.getType();
+
+        // Case 1: Check for savepoint redeploy first (savepointRedeployNonce 
changed)
+        // This takes precedence as it indicates user wants to redeploy from 
their
+        // initialSavepointPath
+        if (diffType == DiffType.SAVEPOINT_REDEPLOY) {
+            return BlueGreenDiffType.SAVEPOINT_REDEPLOY;
+        }
+
+        // Case 2: FlinkDeploymentSpecs are identical
         if (leftSpec.equals(rightSpec)) {
             return BlueGreenDiffType.IGNORE;
         }
 
-        // Extract the diff type from ReflectiveDiffBuilder result
-        DiffType diffType = diffResult.getType();
-
-        // Case 2: ReflectiveDiffBuilder returns IGNORE
+        // Case 3: ReflectiveDiffBuilder returns IGNORE
         if (diffType == DiffType.IGNORE) {
             return BlueGreenDiffType.PATCH_CHILD;
         } else {
-            // Case 3: ReflectiveDiffBuilder returns anything else map it to 
TRANSITION as well
+            // Case 4: ReflectiveDiffBuilder returns anything else map it to 
TRANSITION
             return BlueGreenDiffType.TRANSITION;
         }
     }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java
index 98344a9b..27195908 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java
@@ -332,7 +332,12 @@ public class BlueGreenUtils {
                         "spec",
                         FlinkBlueGreenDeploymentSpec.class);
 
-        // The Blue/Green initialSavepointPath is only used for first-time 
deployments
+        // Determine which savepoint/checkpoint to restore from:
+        // 1. First-time deployments: use initialSavepointPath from spec (if 
set)
+        // 2. Normal transitions: use lastCheckpoint from previous deployment
+        // 3. Redeploy scenarios (lastCheckpoint is null): use 
initialSavepointPath from spec
+        //    - savepointRedeployNonce changed
+        //    - upgradeMode is STATELESS
         if (isFirstDeployment) {
             String initialSavepointPath =
                     
spec.getTemplate().getSpec().getJob().getInitialSavepointPath();
@@ -346,6 +351,16 @@ public class BlueGreenUtils {
             String location = lastCheckpoint.getLocation().replace("file:", 
"");
             LOG.info("Using Blue/Green savepoint/checkpoint: " + location);
             
spec.getTemplate().getSpec().getJob().setInitialSavepointPath(location);
+        } else {
+            String initialSavepointPath =
+                    
spec.getTemplate().getSpec().getJob().getInitialSavepointPath();
+            if (initialSavepointPath != null && 
!initialSavepointPath.isEmpty()) {
+                LOG.info(
+                        "Using user-specified initialSavepointPath for 
redeploy: {}",
+                        initialSavepointPath);
+            } else {
+                LOG.info("Starting fresh with no savepoint restoration");
+            }
         }
 
         flinkDeployment.setSpec(spec.getTemplate().getSpec());
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
index 229406bb..8240b8c1 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
@@ -189,6 +189,68 @@ public class FlinkBlueGreenDeploymentControllerTest {
         return rs;
     }
 
+    @ParameterizedTest
+    
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
+    public void 
verifySavepointRedeployNonceTriggersTransitionWithInitialSavepointPath(
+            FlinkVersion flinkVersion) throws Exception {
+        // Start with SAVEPOINT upgrade mode so normally a savepoint would be 
taken
+        var blueGreenDeployment =
+                buildSessionCluster(
+                        TEST_DEPLOYMENT_NAME,
+                        TEST_NAMESPACE,
+                        flinkVersion,
+                        null,
+                        UpgradeMode.SAVEPOINT);
+        var rs = executeBasicDeployment(flinkVersion, blueGreenDeployment, 
false, null);
+
+        // Set initialSavepointPath and bump savepointRedeployNonce
+        String userSpecifiedSavepoint = "s3://bucket/my-specific-savepoint";
+        rs.deployment
+                .getSpec()
+                .getTemplate()
+                .getSpec()
+                .getJob()
+                .setInitialSavepointPath(userSpecifiedSavepoint);
+        
rs.deployment.getSpec().getTemplate().getSpec().getJob().setSavepointRedeployNonce(12345L);
+        rs.deployment
+                .getSpec()
+                .getConfiguration()
+                .put(DEPLOYMENT_DELETION_DELAY.key(), 
String.valueOf(ALT_DELETION_DELAY_VALUE));
+        kubernetesClient.resource(rs.deployment).createOrReplace();
+
+        // Reconcile - should skip savepointing and go directly to transition
+        rs = reconcile(rs.deployment);
+
+        // Verify: Should be TRANSITIONING_TO_GREEN (not SAVEPOINTING_BLUE)
+        assertEquals(
+                FlinkBlueGreenDeploymentState.TRANSITIONING_TO_GREEN,
+                rs.reconciledStatus.getBlueGreenState());
+
+        // Verify: Green deployment should use the user-specified 
initialSavepointPath
+        var flinkDeployments = getFlinkDeployments();
+        assertEquals(2, flinkDeployments.size());
+        assertEquals(
+                userSpecifiedSavepoint,
+                
flinkDeployments.get(1).getSpec().getJob().getInitialSavepointPath());
+
+        // Complete the transition
+        simulateSuccessfulJobStart(flinkDeployments.get(1));
+        rs = reconcile(rs.deployment);
+
+        // Wait for deletion delay
+        Thread.sleep(rs.updateControl.getScheduleDelay().get());
+        reconcile(rs.deployment);
+
+        // Verify Green is now active
+        flinkDeployments = getFlinkDeployments();
+        assertEquals(1, flinkDeployments.size());
+
+        rs = reconcile(rs.deployment);
+        assertEquals(
+                FlinkBlueGreenDeploymentState.ACTIVE_GREEN,
+                rs.reconciledStatus.getBlueGreenState());
+    }
+
     @ParameterizedTest
     
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
     public void verifyFailureBeforeTransition(FlinkVersion flinkVersion) 
throws Exception {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiffTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiffTest.java
index 9fe6b8bd..272730a0 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiffTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiffTest.java
@@ -201,13 +201,28 @@ public class FlinkBlueGreenDeploymentSpecDiffTest {
         FlinkBlueGreenDeploymentSpec spec1 = createBasicSpec();
         FlinkBlueGreenDeploymentSpec spec2 = createBasicSpec();
 
-        // Change nested spec property - setSavepointRedeployNonce triggers 
TRANSITION
+        // Change nested spec property - setSavepointRedeployNonce now 
triggers SAVEPOINT_REDEPLOY
         
spec2.getTemplate().getSpec().getJob().setSavepointRedeployNonce(12345L);
 
         FlinkBlueGreenDeploymentSpecDiff diff =
                 new FlinkBlueGreenDeploymentSpecDiff(DEPLOYMENT_MODE, spec1, 
spec2);
 
-        assertEquals(BlueGreenDiffType.TRANSITION, diff.compare());
+        assertEquals(BlueGreenDiffType.SAVEPOINT_REDEPLOY, diff.compare());
+    }
+
+    @Test
+    public void testSavepointRedeployForNonceChangeWithJarUpdate() {
+        FlinkBlueGreenDeploymentSpec spec1 = createBasicSpec();
+        FlinkBlueGreenDeploymentSpec spec2 = createBasicSpec();
+
+        // Nonce change with additional jarURI change - SAVEPOINT_REDEPLOY 
should still be detected
+        
spec2.getTemplate().getSpec().getJob().setJarURI("local:///opt/flink/examples/other.jar");
+        
spec2.getTemplate().getSpec().getJob().setSavepointRedeployNonce(12345L);
+
+        FlinkBlueGreenDeploymentSpecDiff diff =
+                new FlinkBlueGreenDeploymentSpecDiff(DEPLOYMENT_MODE, spec1, 
spec2);
+
+        assertEquals(BlueGreenDiffType.SAVEPOINT_REDEPLOY, diff.compare());
     }
 
     @Test
@@ -281,8 +296,8 @@ public class FlinkBlueGreenDeploymentSpecDiffTest {
         FlinkBlueGreenDeploymentSpec spec2 = createBasicSpec();
 
         // Change both top-level (configuration) and nested spec
-        // With new logic, only nested spec changes matter - 
setSavepointRedeployNonce triggers
-        // TRANSITION
+        // With new logic, only nested spec changes matter - 
setSavepointRedeployNonce now
+        // triggers SAVEPOINT_REDEPLOY
         Map<String, String> config = new HashMap<>();
         config.put("custom.config", "different-value");
         spec2.setConfiguration(config);
@@ -291,7 +306,39 @@ public class FlinkBlueGreenDeploymentSpecDiffTest {
         FlinkBlueGreenDeploymentSpecDiff diff =
                 new FlinkBlueGreenDeploymentSpecDiff(DEPLOYMENT_MODE, spec1, 
spec2);
 
-        assertEquals(BlueGreenDiffType.TRANSITION, diff.compare());
+        assertEquals(BlueGreenDiffType.SAVEPOINT_REDEPLOY, diff.compare());
+    }
+
+    @Test
+    public void testSavepointRedeployTakesPrecedenceOverScale() {
+        FlinkBlueGreenDeploymentSpec spec1 = createBasicSpec();
+        FlinkBlueGreenDeploymentSpec spec2 = createBasicSpec();
+
+        // Change both nonce (SAVEPOINT_REDEPLOY) AND parallelism (SCALE)
+        // SAVEPOINT_REDEPLOY should take precedence
+        
spec2.getTemplate().getSpec().getJob().setSavepointRedeployNonce(12345L);
+        spec2.getTemplate().getSpec().getJob().setParallelism(10);
+
+        FlinkBlueGreenDeploymentSpecDiff diff =
+                new FlinkBlueGreenDeploymentSpecDiff(DEPLOYMENT_MODE, spec1, 
spec2);
+
+        assertEquals(BlueGreenDiffType.SAVEPOINT_REDEPLOY, diff.compare());
+    }
+
+    @Test
+    public void testSavepointRedeployTakesPrecedenceOverUpgrade() {
+        FlinkBlueGreenDeploymentSpec spec1 = createBasicSpec();
+        FlinkBlueGreenDeploymentSpec spec2 = createBasicSpec();
+
+        // Change both nonce (SAVEPOINT_REDEPLOY) AND Flink version (UPGRADE)
+        // SAVEPOINT_REDEPLOY should take precedence
+        
spec2.getTemplate().getSpec().getJob().setSavepointRedeployNonce(12345L);
+        spec2.getTemplate().getSpec().setFlinkVersion(FlinkVersion.v1_17);
+
+        FlinkBlueGreenDeploymentSpecDiff diff =
+                new FlinkBlueGreenDeploymentSpecDiff(DEPLOYMENT_MODE, spec1, 
spec2);
+
+        assertEquals(BlueGreenDiffType.SAVEPOINT_REDEPLOY, diff.compare());
     }
 
     @Test
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java
index 859d19f5..a898662e 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java
@@ -27,6 +27,9 @@ import 
org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentTemplateSpec
 import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
 import 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
+import org.apache.flink.kubernetes.operator.api.status.Savepoint;
+import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
+import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
 import org.apache.flink.kubernetes.operator.api.utils.SpecUtils;
 import 
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenContext;
 
@@ -38,6 +41,9 @@ import java.util.Map;
 import java.util.UUID;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Tests for {@link BlueGreenUtils}. */
 public class BlueGreenUtilsTest {
@@ -83,6 +89,141 @@ public class BlueGreenUtilsTest {
         assertEquals(parentDeploymentName + ".jm", 
resultFlinkConfig.get("metrics.scope.jm"));
     }
 
+    @Test
+    public void testSavepointRequiredBasedOnUpgradeMode() {
+        // SAVEPOINT mode requires savepoint
+        FlinkBlueGreenDeployment bgDeployment =
+                buildBlueGreenDeployment("test-app", TEST_NAMESPACE);
+        bgDeployment
+                .getSpec()
+                .getTemplate()
+                .getSpec()
+                .getJob()
+                .setUpgradeMode(UpgradeMode.SAVEPOINT);
+        BlueGreenContext context = createContext(bgDeployment);
+        assertTrue(BlueGreenUtils.isSavepointRequired(context));
+
+        // LAST_STATE mode requires savepoint
+        bgDeployment
+                .getSpec()
+                .getTemplate()
+                .getSpec()
+                .getJob()
+                .setUpgradeMode(UpgradeMode.LAST_STATE);
+        assertTrue(BlueGreenUtils.isSavepointRequired(context));
+
+        // STATELESS mode does not require savepoint
+        bgDeployment
+                .getSpec()
+                .getTemplate()
+                .getSpec()
+                .getJob()
+                .setUpgradeMode(UpgradeMode.STATELESS);
+        assertFalse(BlueGreenUtils.isSavepointRequired(context));
+    }
+
+    @Test
+    public void testPrepareFlinkDeploymentStatelessInitialSavepointPath() {
+        // Setup: STATELESS mode with initialSavepointPath set
+        FlinkBlueGreenDeployment bgDeployment =
+                buildBlueGreenDeployment("test-app", TEST_NAMESPACE);
+        bgDeployment
+                .getSpec()
+                .getTemplate()
+                .getSpec()
+                .getJob()
+                .setUpgradeMode(UpgradeMode.STATELESS);
+        bgDeployment
+                .getSpec()
+                .getTemplate()
+                .getSpec()
+                .getJob()
+                .setInitialSavepointPath("s3://bucket/savepoint-xyz");
+
+        BlueGreenContext context = createContext(bgDeployment);
+
+        // Act: Prepare deployment with null lastCheckpoint (STATELESS 
transition)
+        FlinkDeployment result =
+                BlueGreenUtils.prepareFlinkDeployment(
+                        context,
+                        BlueGreenDeploymentType.GREEN,
+                        null, // No lastCheckpoint
+                        false, // Not first deployment
+                        bgDeployment.getMetadata());
+
+        // Assert: initialSavepointPath should be used for STATELESS
+        assertNotNull(result.getSpec().getJob().getInitialSavepointPath());
+    }
+
+    @Test
+    public void testNullLastCheckpointUsesInitialSavepointPath() {
+        // lastCheckpoint=null -> use initialSavepointPath from spec
+        FlinkBlueGreenDeployment bgDeployment =
+                buildBlueGreenDeployment("test-app", TEST_NAMESPACE);
+        bgDeployment
+                .getSpec()
+                .getTemplate()
+                .getSpec()
+                .getJob()
+                .setUpgradeMode(UpgradeMode.SAVEPOINT);
+        String initialPath = "s3://bucket/user-specified-savepoint";
+        bgDeployment
+                .getSpec()
+                .getTemplate()
+                .getSpec()
+                .getJob()
+                .setInitialSavepointPath(initialPath);
+
+        BlueGreenContext context = createContext(bgDeployment);
+
+        FlinkDeployment result =
+                BlueGreenUtils.prepareFlinkDeployment(
+                        context,
+                        BlueGreenDeploymentType.GREEN,
+                        null, // null = nonce changed, no new savepoint taken
+                        false,
+                        bgDeployment.getMetadata());
+
+        assertEquals(initialPath, 
result.getSpec().getJob().getInitialSavepointPath());
+    }
+
+    @Test
+    public void testNormalTransitionUsesFreshSavepoint() {
+        // Normal transition → take fresh savepoint from running job → use 
that, not
+        // initialSavepointPath
+        FlinkBlueGreenDeployment bgDeployment =
+                buildBlueGreenDeployment("test-app", TEST_NAMESPACE);
+        bgDeployment
+                .getSpec()
+                .getTemplate()
+                .getSpec()
+                .getJob()
+                .setUpgradeMode(UpgradeMode.SAVEPOINT);
+        bgDeployment
+                .getSpec()
+                .getTemplate()
+                .getSpec()
+                .getJob()
+                .setInitialSavepointPath("s3://bucket/ignored");
+
+        BlueGreenContext context = createContext(bgDeployment);
+
+        String freshSavepoint = "s3://bucket/fresh-savepoint-from-running-job";
+        Savepoint triggered =
+                Savepoint.of(
+                        freshSavepoint, SnapshotTriggerType.UPGRADE, 
SavepointFormatType.CANONICAL);
+
+        FlinkDeployment result =
+                BlueGreenUtils.prepareFlinkDeployment(
+                        context,
+                        BlueGreenDeploymentType.GREEN,
+                        triggered, // Fresh savepoint provided
+                        false,
+                        bgDeployment.getMetadata());
+
+        assertEquals(freshSavepoint, 
result.getSpec().getJob().getInitialSavepointPath());
+    }
+
     private static FlinkBlueGreenDeployment buildBlueGreenDeployment(
             String name, String namespace) {
         var deployment = new FlinkBlueGreenDeployment();

Reply via email to