This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 2621c9ad [FLINK-38915] FlinkBlueGreenDeplomynet in place suspension 
handler
2621c9ad is described below

commit 2621c9ad0884b8610caa2b15c62dbd10a26a6a53
Author: James Kan <[email protected]>
AuthorDate: Tue Jan 13 19:52:19 2026 -0800

    [FLINK-38915] FlinkBlueGreenDeplomynet in place suspension handler
---
 .../operator/api/bluegreen/BlueGreenDiffType.java  |  14 +-
 .../bluegreen/BlueGreenDeploymentService.java      |  84 ++++++-
 .../handlers/InitializingBlueStateHandler.java     |  11 +
 .../diff/FlinkBlueGreenDeploymentSpecDiff.java     |  27 +++
 .../FlinkBlueGreenDeploymentControllerTest.java    | 242 +++++++++++++++++++++
 .../diff/FlinkBlueGreenDeploymentSpecDiffTest.java |  24 ++
 6 files changed, 398 insertions(+), 4 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/BlueGreenDiffType.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/BlueGreenDiffType.java
index e456a891..97a83ec2 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/BlueGreenDiffType.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/BlueGreenDiffType.java
@@ -32,5 +32,17 @@ public enum BlueGreenDiffType {
      * Full redeploy from user-specified savepoint. Triggered when 
savepointRedeployNonce changes.
      * Uses the initialSavepointPath from spec instead of taking a new 
savepoint.
      */
-    SAVEPOINT_REDEPLOY
+    SAVEPOINT_REDEPLOY,
+
+    /**
+     * In-place suspension. Triggered when job.state changes from RUNNING to 
SUSPENDED. Suspends the
+     * currently active child without creating a new deployment.
+     */
+    SUSPEND,
+
+    /**
+     * Resume from suspension. Triggered when job.state changes from SUSPENDED 
to RUNNING. Spins up
+     * the child with the current (potentially updated) spec.
+     */
+    RESUME
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java
index 132b05e2..33cd868b 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import 
org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDeploymentType;
 import org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDiffType;
+import 
org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
 import 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
 import org.apache.flink.kubernetes.operator.api.status.Savepoint;
 import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
@@ -110,13 +111,38 @@ public class BlueGreenDeploymentService {
      */
     public UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
             BlueGreenContext context, BlueGreenDeploymentType 
currentBlueGreenDeploymentType) {
+
         BlueGreenDiffType specDiff = getSpecDiff(context);
 
         if (specDiff != BlueGreenDiffType.IGNORE) {
             FlinkDeployment currentFlinkDeployment =
                     
context.getDeploymentByType(currentBlueGreenDeploymentType);
 
-            if (isFlinkDeploymentReady(currentFlinkDeployment)) {
+            if (specDiff == BlueGreenDiffType.SUSPEND && 
currentFlinkDeployment != null) {
+                setLastReconciledSpec(context);
+                LOG.info(
+                        "In-place suspension for '{}'",
+                        currentFlinkDeployment.getMetadata().getName());
+                return patchFlinkDeployment(context, 
currentBlueGreenDeploymentType);
+            }
+
+            if (specDiff == BlueGreenDiffType.RESUME && currentFlinkDeployment 
!= null) {
+                setLastReconciledSpec(context);
+                LOG.info(
+                        "In-place resume for '{}'", 
currentFlinkDeployment.getMetadata().getName());
+                return patchFlinkDeployment(context, 
currentBlueGreenDeploymentType);
+            }
+
+            // Check if child is currently suspended - if so, just patch specs 
without restart
+            if (isChildSuspended(currentFlinkDeployment)) {
+                setLastReconciledSpec(context);
+                LOG.info(
+                        "Spec change while suspended for '{}'",
+                        currentFlinkDeployment.getMetadata().getName());
+                return patchFlinkDeployment(context, 
currentBlueGreenDeploymentType);
+            }
+
+            if (currentFlinkDeployment != null && 
isFlinkDeploymentReady(currentFlinkDeployment)) {
                 if (specDiff == BlueGreenDiffType.TRANSITION) {
                     boolean savepointTriggered = false;
                     try {
@@ -173,12 +199,16 @@ public class BlueGreenDeploymentService {
             } else {
                 if (context.getDeploymentStatus().getJobStatus().getState() != 
JobStatus.FAILING) {
                     setLastReconciledSpec(context);
+                    var childName =
+                            currentFlinkDeployment != null
+                                    ? 
currentFlinkDeployment.getMetadata().getName()
+                                    : "null";
                     var error =
                             String.format(
                                     "Transition to %s not possible, current 
Flink Deployment '%s' is not READY. FAILING '%s'",
                                     
calculateTransition(currentBlueGreenDeploymentType)
                                             .nextBlueGreenDeploymentType,
-                                    
currentFlinkDeployment.getMetadata().getName(),
+                                    childName,
                                     
context.getBgDeployment().getMetadata().getName());
                     return markDeploymentFailing(context, error);
                 }
@@ -188,6 +218,16 @@ public class BlueGreenDeploymentService {
         return UpdateControl.noUpdate();
     }
 
+    private boolean isChildSuspended(FlinkDeployment deployment) {
+        if (deployment == null || deployment.getSpec() == null) {
+            return false;
+        }
+        var job = deployment.getSpec().getJob();
+        return job != null
+                && job.getState()
+                        == 
org.apache.flink.kubernetes.operator.api.spec.JobState.SUSPENDED;
+    }
+
     private UpdateControl<FlinkBlueGreenDeployment> patchFlinkDeployment(
             BlueGreenContext context, BlueGreenDeploymentType 
blueGreenDeploymentTypeToPatch) {
         return patchFlinkDeployment(context, blueGreenDeploymentTypeToPatch, 
true);
@@ -435,6 +475,16 @@ public class BlueGreenDeploymentService {
         TransitionState transitionState =
                 determineTransitionState(context, 
currentBlueGreenDeploymentType);
 
+        if (isChildSuspended(transitionState.nextDeployment)) {
+            if (transitionState.nextDeployment.getStatus().getLifecycleState()
+                    == ResourceLifecycleState.SUSPENDED) {
+                return finalizeSuspendedDeployment(context, 
transitionState.nextState);
+            } else {
+                return shouldWeAbort(
+                        context, transitionState.nextDeployment, 
transitionState.nextState);
+            }
+        }
+
         if (isFlinkDeploymentReady(transitionState.nextDeployment)) {
             return shouldWeDelete(
                     context,
@@ -447,11 +497,36 @@ public class BlueGreenDeploymentService {
         }
     }
 
+    private UpdateControl<FlinkBlueGreenDeployment> 
finalizeSuspendedDeployment(
+            BlueGreenContext context, FlinkBlueGreenDeploymentState nextState) 
{
+
+        LOG.info(
+                "Finalizing suspended deployment '{}' to {} state",
+                context.getDeploymentName(),
+                nextState);
+
+        
context.getDeploymentStatus().setDeploymentReadyTimestamp(millisToInstantStr(0));
+        context.getDeploymentStatus().setAbortTimestamp(millisToInstantStr(0));
+        context.getDeploymentStatus().setSavepointTriggerId(null);
+
+        return patchStatusUpdateControl(context, nextState, 
JobStatus.SUSPENDED, null)
+                .rescheduleAfter(0);
+    }
+
     private UpdateControl<FlinkBlueGreenDeployment> 
handleSpecChangesDuringTransition(
             BlueGreenContext context, BlueGreenDeploymentType 
currentBlueGreenDeploymentType) {
         if (hasSpecChanged(context)) {
             BlueGreenDiffType diffType = getSpecDiff(context);
 
+            // Block SUSPEND during transition - wait for transition to 
complete first
+            if (diffType == BlueGreenDiffType.SUSPEND) {
+                LOG.info(
+                        "Suspend requested during transition for '{}'. "
+                                + "Waiting for transition to complete before 
processing suspend.",
+                        context.getBgDeployment().getMetadata().getName());
+                return null;
+            }
+
             if (diffType != BlueGreenDiffType.IGNORE) {
                 setLastReconciledSpec(context);
                 var oppositeDeploymentType =
@@ -658,7 +733,10 @@ public class BlueGreenDeploymentService {
         context.getDeploymentStatus().setAbortTimestamp(millisToInstantStr(0));
         context.getDeploymentStatus().setSavepointTriggerId(null);
 
-        return patchStatusUpdateControl(context, nextState, JobStatus.RUNNING, 
null);
+        // Finalize status and reschedule immediately so any pending spec 
changes
+        // (e.g., suspend requested during transition) are picked up on next 
reconcile
+        return patchStatusUpdateControl(context, nextState, JobStatus.RUNNING, 
null)
+                .rescheduleAfter(0);
     }
 
     // ==================== Common Utility Methods ====================
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/InitializingBlueStateHandler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/InitializingBlueStateHandler.java
index f2882d46..8319314f 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/InitializingBlueStateHandler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/InitializingBlueStateHandler.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.kubernetes.operator.controller.bluegreen.handlers;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
 import 
org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDeploymentType;
+import org.apache.flink.kubernetes.operator.api.spec.JobState;
 import 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
 import 
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenContext;
@@ -27,6 +28,7 @@ import 
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenDeploy
 
 import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
 
+import static 
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenDeploymentService.patchStatusUpdateControl;
 import static 
org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.hasSpecChanged;
 import static 
org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.setLastReconciledSpec;
 
@@ -41,6 +43,15 @@ public class InitializingBlueStateHandler extends 
AbstractBlueGreenStateHandler
     public UpdateControl<FlinkBlueGreenDeployment> handle(BlueGreenContext 
context) {
         FlinkBlueGreenDeploymentStatus deploymentStatus = 
context.getDeploymentStatus();
 
+        // Block initial deployment if job.state is SUSPENDED - user must 
start with RUNNING
+        var jobSpec = 
context.getBgDeployment().getSpec().getTemplate().getSpec().getJob();
+        if (jobSpec != null && jobSpec.getState() == JobState.SUSPENDED) {
+            LOG.info(
+                    "Blocking initial deployment '{}' - job.state is 
SUSPENDED, waiting for RUNNING",
+                    context.getBgDeployment().getMetadata().getName());
+            return patchStatusUpdateControl(context, null, 
JobStatus.SUSPENDED, null);
+        }
+
         // Deploy only if this is the initial deployment (no previous spec 
exists)
         // or if we're recovering from a failure and the spec has changed 
since the last attempt
         if (deploymentStatus.getLastReconciledSpec() == null
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiff.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiff.java
index a43f3168..3e5dba77 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiff.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiff.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDiffType;
 import org.apache.flink.kubernetes.operator.api.diff.DiffType;
 import 
org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpec;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.api.spec.JobState;
 import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
 
 import lombok.NonNull;
@@ -59,6 +60,19 @@ public class FlinkBlueGreenDeploymentSpecDiff {
         FlinkDeploymentSpec leftSpec = left.getTemplate().getSpec();
         FlinkDeploymentSpec rightSpec = right.getTemplate().getSpec();
 
+        // Check for suspend/resume state changes first - these take highest 
precedence
+        // for in-place suspension handling
+        JobState leftJobState = getJobState(leftSpec);
+        JobState rightJobState = getJobState(rightSpec);
+
+        if (leftJobState != rightJobState) {
+            if (rightJobState == JobState.SUSPENDED) {
+                return BlueGreenDiffType.SUSPEND;
+            } else if (leftJobState == JobState.SUSPENDED && rightJobState == 
JobState.RUNNING) {
+                return BlueGreenDiffType.RESUME;
+            }
+        }
+
         // Used in Case 2, 3 & 4: Delegate to ReflectiveDiffBuilder for nested 
spec comparison
         // Calculate diffResult before comparison to apply in-place removal of 
ignored fields
         DiffResult<FlinkDeploymentSpec> diffResult =
@@ -88,6 +102,19 @@ public class FlinkBlueGreenDeploymentSpecDiff {
         }
     }
 
+    /**
+     * Gets the job state from the spec, defaulting to RUNNING if not set.
+     *
+     * @param spec the FlinkDeploymentSpec
+     * @return the job state, or RUNNING if job or state is null
+     */
+    private JobState getJobState(FlinkDeploymentSpec spec) {
+        if (spec.getJob() == null || spec.getJob().getState() == null) {
+            return JobState.RUNNING;
+        }
+        return spec.getJob().getState();
+    }
+
     /**
      * Validates that the specs and their nested components are not null. 
Throws
      * IllegalArgumentException if any required component is null.
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
index 11c1b931..bc571c5d 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
@@ -251,6 +251,168 @@ public class FlinkBlueGreenDeploymentControllerTest {
                 rs.reconciledStatus.getBlueGreenState());
     }
 
+    @ParameterizedTest
+    
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
+    public void verifySuspendAndResumeInPlace(FlinkVersion flinkVersion) 
throws Exception {
+        var rs = setupActiveBlueDeployment(flinkVersion);
+        assertEquals(1, getFlinkDeployments().size());
+        assertEquals(
+                FlinkBlueGreenDeploymentState.ACTIVE_BLUE,
+                rs.reconciledStatus.getBlueGreenState(),
+                "Should start as ACTIVE_BLUE");
+
+        // Suspend in-place with spec change (parallelism bump)
+        var deployment = rs.deployment;
+        
deployment.getSpec().getTemplate().getSpec().getJob().setState(JobState.SUSPENDED);
+        
deployment.getSpec().getTemplate().getSpec().getJob().setParallelism(5);
+        kubernetesClient.resource(deployment).createOrReplace();
+
+        rs = reconcile(deployment);
+        deployment = rs.deployment;
+        // Verify BG goes through TRANSITIONING_TO_BLUE state (unified patch 
mechanism)
+        assertEquals(
+                JobStatus.RECONCILING,
+                rs.reconciledStatus.getJobStatus().getState(),
+                "BG status should be RECONCILING after suspend initiated");
+        assertEquals(
+                FlinkBlueGreenDeploymentState.TRANSITIONING_TO_BLUE,
+                rs.reconciledStatus.getBlueGreenState(),
+                "Should go through TRANSITIONING_TO_BLUE during suspend");
+
+        var flinkDeployments = getFlinkDeployments();
+        assertEquals(1, flinkDeployments.size(), "Suspend should keep single 
child");
+        var child = flinkDeployments.get(0);
+        assertTrue(
+                child.getMetadata().getName().endsWith("-blue"), "Child should 
be blue deployment");
+        assertEquals(
+                JobState.SUSPENDED,
+                child.getSpec().getJob().getState(),
+                "Child should be suspended");
+        assertEquals(5, child.getSpec().getJob().getParallelism(), "Spec 
change should be applied");
+
+        // Simulate child becoming suspended (lifecycleState is computed from 
lastReconciledSpec)
+        simulateSuccessfulSuspend(child);
+        rs = reconcile(deployment);
+        deployment = rs.deployment;
+        assertEquals(JobStatus.SUSPENDED, 
rs.reconciledStatus.getJobStatus().getState());
+        assertEquals(
+                FlinkBlueGreenDeploymentState.ACTIVE_BLUE,
+                rs.reconciledStatus.getBlueGreenState(),
+                "Should finalize to ACTIVE_BLUE when suspended");
+
+        // Resume in-place with another spec change
+        
deployment.getSpec().getTemplate().getSpec().getJob().setState(JobState.RUNNING);
+        
deployment.getSpec().getTemplate().getSpec().getJob().setParallelism(6);
+        kubernetesClient.resource(deployment).createOrReplace();
+
+        rs = reconcile(deployment);
+        deployment = rs.deployment;
+        // Resume also goes through TRANSITIONING_TO_BLUE (unified patch 
mechanism)
+        assertEquals(
+                FlinkBlueGreenDeploymentState.TRANSITIONING_TO_BLUE,
+                rs.reconciledStatus.getBlueGreenState(),
+                "Should go through TRANSITIONING_TO_BLUE during resume");
+        // Persist the BG status update to kubernetes for the next reconcile
+        kubernetesClient.resource(deployment).updateStatus();
+
+        flinkDeployments = getFlinkDeployments();
+        assertEquals(1, flinkDeployments.size(), "Resume should keep single 
child");
+        child = flinkDeployments.get(0);
+        assertTrue(
+                child.getMetadata().getName().endsWith("-blue"),
+                "Child should still be blue deployment");
+        assertEquals(JobState.RUNNING, child.getSpec().getJob().getState(), 
"Child should resume");
+        assertEquals(6, child.getSpec().getJob().getParallelism(), "Latest 
spec should be applied");
+
+        // Simulate child becoming running and verify BG status syncs
+        simulateSuccessfulJobStart(child);
+        rs = reconcile(deployment);
+        assertEquals(JobStatus.RUNNING, 
rs.reconciledStatus.getJobStatus().getState());
+        assertEquals(
+                FlinkBlueGreenDeploymentState.ACTIVE_BLUE,
+                rs.reconciledStatus.getBlueGreenState(),
+                "Should finalize to ACTIVE_BLUE after resume");
+    }
+
+    @ParameterizedTest
+    
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
+    public void 
verifySuspendDuringTransitionIsBlockedUntilComplete(FlinkVersion flinkVersion)
+            throws Exception {
+        // Start with a running Blue deployment with longer abort grace period
+        var blueGreenDeployment =
+                buildSessionCluster(
+                        TEST_DEPLOYMENT_NAME,
+                        TEST_NAMESPACE,
+                        flinkVersion,
+                        null,
+                        UpgradeMode.STATELESS); // STATELESS to skip 
savepointing
+        // Set longer abort grace period BEFORE deployment to avoid abort 
during transition test
+        
blueGreenDeployment.getSpec().getConfiguration().put(ABORT_GRACE_PERIOD.key(), 
"60000");
+        // Use zero deletion delay to avoid timing-based flakiness during 
transition completion
+        
blueGreenDeployment.getSpec().getConfiguration().put(DEPLOYMENT_DELETION_DELAY.key(),
 "0");
+        var rs = executeBasicDeployment(flinkVersion, blueGreenDeployment, 
false, null);
+
+        // === TRIGGER TRANSITION ===
+        
rs.deployment.getSpec().getTemplate().getSpec().getJob().setParallelism(5);
+        kubernetesClient.resource(rs.deployment).createOrReplace();
+
+        // Reconcile - start transition to GREEN
+        rs = reconcile(rs.deployment);
+
+        // GREEN deployment should be created
+        var flinkDeployments = getFlinkDeployments();
+        assertEquals(2, flinkDeployments.size());
+
+        // === SUSPEND DURING TRANSITION ===
+        
rs.deployment.getSpec().getTemplate().getSpec().getJob().setState(JobState.SUSPENDED);
+        kubernetesClient.resource(rs.deployment).createOrReplace();
+
+        // Reconcile - suspend should be BLOCKED, transition continues
+        rs = reconcile(rs.deployment);
+
+        // GREEN deployment should NOT be suspended yet
+        flinkDeployments = getFlinkDeployments();
+        var greenDeployment =
+                flinkDeployments.stream()
+                        .filter(fd -> 
fd.getMetadata().getName().endsWith("-green"))
+                        .findFirst()
+                        .orElseThrow();
+        assertEquals(JobState.RUNNING, 
greenDeployment.getSpec().getJob().getState());
+
+        // === TRANSITION COMPLETES ===
+        // Simulate GREEN becoming ready
+        simulateSuccessfulJobStart(greenDeployment);
+
+        // Reconcile to complete transition and process pending suspend
+        // (sets timestamp, deletes BLUE, finalizes to ACTIVE_GREEN, then 
processes suspend
+        // via TRANSITIONING_TO_GREEN)
+        for (int i = 0; i < 5; i++) {
+            rs = reconcile(rs.deployment);
+        }
+        // Persist the BG status update to kubernetes for the next reconcile
+        kubernetesClient.resource(rs.deployment).updateStatus();
+
+        // GREEN should now be suspended in spec (via patchFlinkDeployment)
+        flinkDeployments = getFlinkDeployments();
+        greenDeployment =
+                flinkDeployments.stream()
+                        .filter(fd -> 
fd.getMetadata().getName().endsWith("-green"))
+                        .findFirst()
+                        .orElseThrow();
+        assertEquals(JobState.SUSPENDED, 
greenDeployment.getSpec().getJob().getState());
+
+        // Simulate GREEN child becoming suspended (lifecycleState is computed 
from
+        // lastReconciledSpec)
+        simulateSuccessfulSuspend(greenDeployment);
+
+        // Reconcile - finalizeSuspendedDeployment should set BG job status to 
SUSPENDED
+        rs = reconcile(rs.deployment);
+        assertEquals(JobStatus.SUSPENDED, 
rs.reconciledStatus.getJobStatus().getState());
+        assertEquals(
+                FlinkBlueGreenDeploymentState.ACTIVE_GREEN,
+                rs.reconciledStatus.getBlueGreenState());
+    }
+
     @ParameterizedTest
     
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
     public void verifyFailureBeforeTransition(FlinkVersion flinkVersion) 
throws Exception {
@@ -654,6 +816,76 @@ public class FlinkBlueGreenDeploymentControllerTest {
         assertEquals(1, flinkDeployments.size());
     }
 
+    @ParameterizedTest
+    
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
+    public void 
verifySuspendWhenChildNotReadyPatchesWithoutFailing(FlinkVersion flinkVersion)
+            throws Exception {
+        var rs = setupActiveBlueDeployment(flinkVersion);
+
+        // Mark the active child as not ready
+        var child = getFlinkDeployments().get(0);
+        child.getStatus().getJobStatus().setState(JobStatus.RECONCILING);
+        kubernetesClient.resource(child).update();
+
+        // Suspend and bump parallelism
+        var deployment = rs.deployment;
+        
deployment.getSpec().getTemplate().getSpec().getJob().setState(JobState.SUSPENDED);
+        
deployment.getSpec().getTemplate().getSpec().getJob().setParallelism(9);
+        kubernetesClient.resource(deployment).createOrReplace();
+
+        rs = reconcile(deployment);
+
+        var flinkDeployments = getFlinkDeployments();
+        assertEquals(1, flinkDeployments.size(), "Should not create new 
child");
+        child = flinkDeployments.get(0);
+        assertEquals(JobState.SUSPENDED, child.getSpec().getJob().getState());
+        assertEquals(9, child.getSpec().getJob().getParallelism(), "Spec 
change should apply");
+        assertNotEquals(
+                JobStatus.FAILING,
+                rs.reconciledStatus.getJobStatus().getState(),
+                "Should not mark parent failing when suspending not-ready 
child");
+    }
+
+    @ParameterizedTest
+    
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
+    public void 
verifyInitialSuspendedIsBlockedThenDeploysOnRunning(FlinkVersion flinkVersion)
+            throws Exception {
+        var bg =
+                buildSessionCluster(
+                        TEST_DEPLOYMENT_NAME,
+                        TEST_NAMESPACE,
+                        flinkVersion,
+                        null,
+                        UpgradeMode.STATELESS);
+        
bg.getSpec().getTemplate().getSpec().getJob().setState(JobState.SUSPENDED);
+        kubernetesClient.resource(bg).createOrReplace();
+
+        // First reconcile initializes status to INITIALIZING_BLUE
+        var rs = reconcile(bg);
+        // Second reconcile: handler detects SUSPENDED and blocks
+        rs = reconcile(rs.deployment);
+
+        // Block child creation when initial state is SUSPENDED
+        assertEquals(0, getFlinkDeployments().size(), "No child should be 
created");
+        assertEquals(
+                JobStatus.SUSPENDED,
+                rs.reconciledStatus.getJobStatus().getState(),
+                "Job status should be SUSPENDED when initial deployment is 
blocked");
+
+        // Flip to RUNNING and reconcile again
+        bg = rs.deployment;
+        
bg.getSpec().getTemplate().getSpec().getJob().setState(JobState.RUNNING);
+        kubernetesClient.resource(bg).createOrReplace();
+
+        rs = reconcile(bg);
+
+        assertEquals(1, getFlinkDeployments().size(), "Child should be created 
after fix");
+        assertEquals(
+                JobStatus.RECONCILING,
+                rs.reconciledStatus.getJobStatus().getState(),
+                "Job status should be RECONCILING after deployment initiated");
+    }
+
     // ==================== Parameterized Test Inputs ====================
 
     static Stream<Arguments> savepointErrorProvider() {
@@ -1199,6 +1431,16 @@ public class FlinkBlueGreenDeploymentControllerTest {
         kubernetesClient.resource(deployment).update();
     }
 
+    private void simulateSuccessfulSuspend(FlinkDeployment deployment) {
+        deployment.getStatus().getJobStatus().setState(JobStatus.FINISHED);
+        
deployment.getStatus().getReconciliationStatus().setState(ReconciliationState.DEPLOYED);
+        deployment
+                .getStatus()
+                .getReconciliationStatus()
+                .serializeAndSetLastReconciledSpec(deployment.getSpec(), 
deployment);
+        kubernetesClient.resource(deployment).update();
+    }
+
     private void simulateJobFailure(FlinkDeployment deployment) {
         deployment.getStatus().getJobStatus().setState(JobStatus.RECONCILING);
         
deployment.getStatus().getReconciliationStatus().setState(ReconciliationState.UPGRADING);
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiffTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiffTest.java
index 272730a0..1e60b83f 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiffTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiffTest.java
@@ -134,6 +134,30 @@ public class FlinkBlueGreenDeploymentSpecDiffTest {
         assertEquals(BlueGreenDiffType.IGNORE, diff.compare());
     }
 
+    @Test
+    public void testSuspendOnJobStateChange() {
+        FlinkBlueGreenDeploymentSpec spec1 = createBasicSpec(); // RUNNING 
default
+        FlinkBlueGreenDeploymentSpec spec2 = createBasicSpec();
+        spec2.getTemplate().getSpec().getJob().setState(JobState.SUSPENDED);
+
+        FlinkBlueGreenDeploymentSpecDiff diff =
+                new FlinkBlueGreenDeploymentSpecDiff(DEPLOYMENT_MODE, spec1, 
spec2);
+
+        assertEquals(BlueGreenDiffType.SUSPEND, diff.compare());
+    }
+
+    @Test
+    public void testResumeOnJobStateChange() {
+        FlinkBlueGreenDeploymentSpec spec1 = createBasicSpec();
+        spec1.getTemplate().getSpec().getJob().setState(JobState.SUSPENDED);
+        FlinkBlueGreenDeploymentSpec spec2 = createBasicSpec(); // RUNNING 
default
+
+        FlinkBlueGreenDeploymentSpecDiff diff =
+                new FlinkBlueGreenDeploymentSpecDiff(DEPLOYMENT_MODE, spec1, 
spec2);
+
+        assertEquals(BlueGreenDiffType.RESUME, diff.compare());
+    }
+
     @Test
     public void testIgnoreForRootPodTemplateAdditionalProps() {
         FlinkBlueGreenDeploymentSpec spec1 = createBasicSpec();

Reply via email to