This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 2621c9ad [FLINK-38915] FlinkBlueGreenDeplomynet in place suspension
handler
2621c9ad is described below
commit 2621c9ad0884b8610caa2b15c62dbd10a26a6a53
Author: James Kan <[email protected]>
AuthorDate: Tue Jan 13 19:52:19 2026 -0800
[FLINK-38915] FlinkBlueGreenDeplomynet in place suspension handler
---
.../operator/api/bluegreen/BlueGreenDiffType.java | 14 +-
.../bluegreen/BlueGreenDeploymentService.java | 84 ++++++-
.../handlers/InitializingBlueStateHandler.java | 11 +
.../diff/FlinkBlueGreenDeploymentSpecDiff.java | 27 +++
.../FlinkBlueGreenDeploymentControllerTest.java | 242 +++++++++++++++++++++
.../diff/FlinkBlueGreenDeploymentSpecDiffTest.java | 24 ++
6 files changed, 398 insertions(+), 4 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/BlueGreenDiffType.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/BlueGreenDiffType.java
index e456a891..97a83ec2 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/BlueGreenDiffType.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/BlueGreenDiffType.java
@@ -32,5 +32,17 @@ public enum BlueGreenDiffType {
* Full redeploy from user-specified savepoint. Triggered when
savepointRedeployNonce changes.
* Uses the initialSavepointPath from spec instead of taking a new
savepoint.
*/
- SAVEPOINT_REDEPLOY
+ SAVEPOINT_REDEPLOY,
+
+ /**
+ * In-place suspension. Triggered when job.state changes from RUNNING to
SUSPENDED. Suspends the
+ * currently active child without creating a new deployment.
+ */
+ SUSPEND,
+
+ /**
+ * Resume from suspension. Triggered when job.state changes from SUSPENDED
to RUNNING. Spins up
+ * the child with the current (potentially updated) spec.
+ */
+ RESUME
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java
index 132b05e2..33cd868b 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java
@@ -22,6 +22,7 @@ import
org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import
org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDeploymentType;
import org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDiffType;
+import
org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
import
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
@@ -110,13 +111,38 @@ public class BlueGreenDeploymentService {
*/
public UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
BlueGreenContext context, BlueGreenDeploymentType
currentBlueGreenDeploymentType) {
+
BlueGreenDiffType specDiff = getSpecDiff(context);
if (specDiff != BlueGreenDiffType.IGNORE) {
FlinkDeployment currentFlinkDeployment =
context.getDeploymentByType(currentBlueGreenDeploymentType);
- if (isFlinkDeploymentReady(currentFlinkDeployment)) {
+ if (specDiff == BlueGreenDiffType.SUSPEND &&
currentFlinkDeployment != null) {
+ setLastReconciledSpec(context);
+ LOG.info(
+ "In-place suspension for '{}'",
+ currentFlinkDeployment.getMetadata().getName());
+ return patchFlinkDeployment(context,
currentBlueGreenDeploymentType);
+ }
+
+ if (specDiff == BlueGreenDiffType.RESUME && currentFlinkDeployment
!= null) {
+ setLastReconciledSpec(context);
+ LOG.info(
+ "In-place resume for '{}'",
currentFlinkDeployment.getMetadata().getName());
+ return patchFlinkDeployment(context,
currentBlueGreenDeploymentType);
+ }
+
+ // Check if child is currently suspended - if so, just patch specs
without restart
+ if (isChildSuspended(currentFlinkDeployment)) {
+ setLastReconciledSpec(context);
+ LOG.info(
+ "Spec change while suspended for '{}'",
+ currentFlinkDeployment.getMetadata().getName());
+ return patchFlinkDeployment(context,
currentBlueGreenDeploymentType);
+ }
+
+ if (currentFlinkDeployment != null &&
isFlinkDeploymentReady(currentFlinkDeployment)) {
if (specDiff == BlueGreenDiffType.TRANSITION) {
boolean savepointTriggered = false;
try {
@@ -173,12 +199,16 @@ public class BlueGreenDeploymentService {
} else {
if (context.getDeploymentStatus().getJobStatus().getState() !=
JobStatus.FAILING) {
setLastReconciledSpec(context);
+ var childName =
+ currentFlinkDeployment != null
+ ?
currentFlinkDeployment.getMetadata().getName()
+ : "null";
var error =
String.format(
"Transition to %s not possible, current
Flink Deployment '%s' is not READY. FAILING '%s'",
calculateTransition(currentBlueGreenDeploymentType)
.nextBlueGreenDeploymentType,
-
currentFlinkDeployment.getMetadata().getName(),
+ childName,
context.getBgDeployment().getMetadata().getName());
return markDeploymentFailing(context, error);
}
@@ -188,6 +218,16 @@ public class BlueGreenDeploymentService {
return UpdateControl.noUpdate();
}
+ private boolean isChildSuspended(FlinkDeployment deployment) {
+ if (deployment == null || deployment.getSpec() == null) {
+ return false;
+ }
+ var job = deployment.getSpec().getJob();
+ return job != null
+ && job.getState()
+ ==
org.apache.flink.kubernetes.operator.api.spec.JobState.SUSPENDED;
+ }
+
private UpdateControl<FlinkBlueGreenDeployment> patchFlinkDeployment(
BlueGreenContext context, BlueGreenDeploymentType
blueGreenDeploymentTypeToPatch) {
return patchFlinkDeployment(context, blueGreenDeploymentTypeToPatch,
true);
@@ -435,6 +475,16 @@ public class BlueGreenDeploymentService {
TransitionState transitionState =
determineTransitionState(context,
currentBlueGreenDeploymentType);
+ if (isChildSuspended(transitionState.nextDeployment)) {
+ if (transitionState.nextDeployment.getStatus().getLifecycleState()
+ == ResourceLifecycleState.SUSPENDED) {
+ return finalizeSuspendedDeployment(context,
transitionState.nextState);
+ } else {
+ return shouldWeAbort(
+ context, transitionState.nextDeployment,
transitionState.nextState);
+ }
+ }
+
if (isFlinkDeploymentReady(transitionState.nextDeployment)) {
return shouldWeDelete(
context,
@@ -447,11 +497,36 @@ public class BlueGreenDeploymentService {
}
}
+ private UpdateControl<FlinkBlueGreenDeployment>
finalizeSuspendedDeployment(
+ BlueGreenContext context, FlinkBlueGreenDeploymentState nextState)
{
+
+ LOG.info(
+ "Finalizing suspended deployment '{}' to {} state",
+ context.getDeploymentName(),
+ nextState);
+
+
context.getDeploymentStatus().setDeploymentReadyTimestamp(millisToInstantStr(0));
+ context.getDeploymentStatus().setAbortTimestamp(millisToInstantStr(0));
+ context.getDeploymentStatus().setSavepointTriggerId(null);
+
+ return patchStatusUpdateControl(context, nextState,
JobStatus.SUSPENDED, null)
+ .rescheduleAfter(0);
+ }
+
private UpdateControl<FlinkBlueGreenDeployment>
handleSpecChangesDuringTransition(
BlueGreenContext context, BlueGreenDeploymentType
currentBlueGreenDeploymentType) {
if (hasSpecChanged(context)) {
BlueGreenDiffType diffType = getSpecDiff(context);
+ // Block SUSPEND during transition - wait for transition to
complete first
+ if (diffType == BlueGreenDiffType.SUSPEND) {
+ LOG.info(
+ "Suspend requested during transition for '{}'. "
+ + "Waiting for transition to complete before
processing suspend.",
+ context.getBgDeployment().getMetadata().getName());
+ return null;
+ }
+
if (diffType != BlueGreenDiffType.IGNORE) {
setLastReconciledSpec(context);
var oppositeDeploymentType =
@@ -658,7 +733,10 @@ public class BlueGreenDeploymentService {
context.getDeploymentStatus().setAbortTimestamp(millisToInstantStr(0));
context.getDeploymentStatus().setSavepointTriggerId(null);
- return patchStatusUpdateControl(context, nextState, JobStatus.RUNNING,
null);
+ // Finalize status and reschedule immediately so any pending spec
changes
+ // (e.g., suspend requested during transition) are picked up on next
reconcile
+ return patchStatusUpdateControl(context, nextState, JobStatus.RUNNING,
null)
+ .rescheduleAfter(0);
}
// ==================== Common Utility Methods ====================
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/InitializingBlueStateHandler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/InitializingBlueStateHandler.java
index f2882d46..8319314f 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/InitializingBlueStateHandler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/InitializingBlueStateHandler.java
@@ -20,6 +20,7 @@ package
org.apache.flink.kubernetes.operator.controller.bluegreen.handlers;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
import
org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDeploymentType;
+import org.apache.flink.kubernetes.operator.api.spec.JobState;
import
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
import
org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
import
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenContext;
@@ -27,6 +28,7 @@ import
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenDeploy
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import static
org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenDeploymentService.patchStatusUpdateControl;
import static
org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.hasSpecChanged;
import static
org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.setLastReconciledSpec;
@@ -41,6 +43,15 @@ public class InitializingBlueStateHandler extends
AbstractBlueGreenStateHandler
public UpdateControl<FlinkBlueGreenDeployment> handle(BlueGreenContext
context) {
FlinkBlueGreenDeploymentStatus deploymentStatus =
context.getDeploymentStatus();
+ // Block initial deployment if job.state is SUSPENDED - user must
start with RUNNING
+ var jobSpec =
context.getBgDeployment().getSpec().getTemplate().getSpec().getJob();
+ if (jobSpec != null && jobSpec.getState() == JobState.SUSPENDED) {
+ LOG.info(
+ "Blocking initial deployment '{}' - job.state is
SUSPENDED, waiting for RUNNING",
+ context.getBgDeployment().getMetadata().getName());
+ return patchStatusUpdateControl(context, null,
JobStatus.SUSPENDED, null);
+ }
+
// Deploy only if this is the initial deployment (no previous spec
exists)
// or if we're recovering from a failure and the spec has changed
since the last attempt
if (deploymentStatus.getLastReconciledSpec() == null
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiff.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiff.java
index a43f3168..3e5dba77 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiff.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiff.java
@@ -22,6 +22,7 @@ import
org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDiffType;
import org.apache.flink.kubernetes.operator.api.diff.DiffType;
import
org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
+import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
import lombok.NonNull;
@@ -59,6 +60,19 @@ public class FlinkBlueGreenDeploymentSpecDiff {
FlinkDeploymentSpec leftSpec = left.getTemplate().getSpec();
FlinkDeploymentSpec rightSpec = right.getTemplate().getSpec();
+ // Check for suspend/resume state changes first - these take highest
precedence
+ // for in-place suspension handling
+ JobState leftJobState = getJobState(leftSpec);
+ JobState rightJobState = getJobState(rightSpec);
+
+ if (leftJobState != rightJobState) {
+ if (rightJobState == JobState.SUSPENDED) {
+ return BlueGreenDiffType.SUSPEND;
+ } else if (leftJobState == JobState.SUSPENDED && rightJobState ==
JobState.RUNNING) {
+ return BlueGreenDiffType.RESUME;
+ }
+ }
+
// Used in Case 2, 3 & 4: Delegate to ReflectiveDiffBuilder for nested
spec comparison
// Calculate diffResult before comparison to apply in-place removal of
ignored fields
DiffResult<FlinkDeploymentSpec> diffResult =
@@ -88,6 +102,19 @@ public class FlinkBlueGreenDeploymentSpecDiff {
}
}
+ /**
+ * Gets the job state from the spec, defaulting to RUNNING if not set.
+ *
+ * @param spec the FlinkDeploymentSpec
+ * @return the job state, or RUNNING if job or state is null
+ */
+ private JobState getJobState(FlinkDeploymentSpec spec) {
+ if (spec.getJob() == null || spec.getJob().getState() == null) {
+ return JobState.RUNNING;
+ }
+ return spec.getJob().getState();
+ }
+
/**
* Validates that the specs and their nested components are not null.
Throws
* IllegalArgumentException if any required component is null.
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
index 11c1b931..bc571c5d 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java
@@ -251,6 +251,168 @@ public class FlinkBlueGreenDeploymentControllerTest {
rs.reconciledStatus.getBlueGreenState());
}
+ @ParameterizedTest
+
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
+ public void verifySuspendAndResumeInPlace(FlinkVersion flinkVersion)
throws Exception {
+ var rs = setupActiveBlueDeployment(flinkVersion);
+ assertEquals(1, getFlinkDeployments().size());
+ assertEquals(
+ FlinkBlueGreenDeploymentState.ACTIVE_BLUE,
+ rs.reconciledStatus.getBlueGreenState(),
+ "Should start as ACTIVE_BLUE");
+
+ // Suspend in-place with spec change (parallelism bump)
+ var deployment = rs.deployment;
+
deployment.getSpec().getTemplate().getSpec().getJob().setState(JobState.SUSPENDED);
+
deployment.getSpec().getTemplate().getSpec().getJob().setParallelism(5);
+ kubernetesClient.resource(deployment).createOrReplace();
+
+ rs = reconcile(deployment);
+ deployment = rs.deployment;
+ // Verify BG goes through TRANSITIONING_TO_BLUE state (unified patch
mechanism)
+ assertEquals(
+ JobStatus.RECONCILING,
+ rs.reconciledStatus.getJobStatus().getState(),
+ "BG status should be RECONCILING after suspend initiated");
+ assertEquals(
+ FlinkBlueGreenDeploymentState.TRANSITIONING_TO_BLUE,
+ rs.reconciledStatus.getBlueGreenState(),
+ "Should go through TRANSITIONING_TO_BLUE during suspend");
+
+ var flinkDeployments = getFlinkDeployments();
+ assertEquals(1, flinkDeployments.size(), "Suspend should keep single
child");
+ var child = flinkDeployments.get(0);
+ assertTrue(
+ child.getMetadata().getName().endsWith("-blue"), "Child should
be blue deployment");
+ assertEquals(
+ JobState.SUSPENDED,
+ child.getSpec().getJob().getState(),
+ "Child should be suspended");
+ assertEquals(5, child.getSpec().getJob().getParallelism(), "Spec
change should be applied");
+
+ // Simulate child becoming suspended (lifecycleState is computed from
lastReconciledSpec)
+ simulateSuccessfulSuspend(child);
+ rs = reconcile(deployment);
+ deployment = rs.deployment;
+ assertEquals(JobStatus.SUSPENDED,
rs.reconciledStatus.getJobStatus().getState());
+ assertEquals(
+ FlinkBlueGreenDeploymentState.ACTIVE_BLUE,
+ rs.reconciledStatus.getBlueGreenState(),
+ "Should finalize to ACTIVE_BLUE when suspended");
+
+ // Resume in-place with another spec change
+
deployment.getSpec().getTemplate().getSpec().getJob().setState(JobState.RUNNING);
+
deployment.getSpec().getTemplate().getSpec().getJob().setParallelism(6);
+ kubernetesClient.resource(deployment).createOrReplace();
+
+ rs = reconcile(deployment);
+ deployment = rs.deployment;
+ // Resume also goes through TRANSITIONING_TO_BLUE (unified patch
mechanism)
+ assertEquals(
+ FlinkBlueGreenDeploymentState.TRANSITIONING_TO_BLUE,
+ rs.reconciledStatus.getBlueGreenState(),
+ "Should go through TRANSITIONING_TO_BLUE during resume");
+ // Persist the BG status update to kubernetes for the next reconcile
+ kubernetesClient.resource(deployment).updateStatus();
+
+ flinkDeployments = getFlinkDeployments();
+ assertEquals(1, flinkDeployments.size(), "Resume should keep single
child");
+ child = flinkDeployments.get(0);
+ assertTrue(
+ child.getMetadata().getName().endsWith("-blue"),
+ "Child should still be blue deployment");
+ assertEquals(JobState.RUNNING, child.getSpec().getJob().getState(),
"Child should resume");
+ assertEquals(6, child.getSpec().getJob().getParallelism(), "Latest
spec should be applied");
+
+ // Simulate child becoming running and verify BG status syncs
+ simulateSuccessfulJobStart(child);
+ rs = reconcile(deployment);
+ assertEquals(JobStatus.RUNNING,
rs.reconciledStatus.getJobStatus().getState());
+ assertEquals(
+ FlinkBlueGreenDeploymentState.ACTIVE_BLUE,
+ rs.reconciledStatus.getBlueGreenState(),
+ "Should finalize to ACTIVE_BLUE after resume");
+ }
+
+ @ParameterizedTest
+
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
+ public void
verifySuspendDuringTransitionIsBlockedUntilComplete(FlinkVersion flinkVersion)
+ throws Exception {
+ // Start with a running Blue deployment with longer abort grace period
+ var blueGreenDeployment =
+ buildSessionCluster(
+ TEST_DEPLOYMENT_NAME,
+ TEST_NAMESPACE,
+ flinkVersion,
+ null,
+ UpgradeMode.STATELESS); // STATELESS to skip
savepointing
+ // Set longer abort grace period BEFORE deployment to avoid abort
during transition test
+
blueGreenDeployment.getSpec().getConfiguration().put(ABORT_GRACE_PERIOD.key(),
"60000");
+ // Use zero deletion delay to avoid timing-based flakiness during
transition completion
+
blueGreenDeployment.getSpec().getConfiguration().put(DEPLOYMENT_DELETION_DELAY.key(),
"0");
+ var rs = executeBasicDeployment(flinkVersion, blueGreenDeployment,
false, null);
+
+ // === TRIGGER TRANSITION ===
+
rs.deployment.getSpec().getTemplate().getSpec().getJob().setParallelism(5);
+ kubernetesClient.resource(rs.deployment).createOrReplace();
+
+ // Reconcile - start transition to GREEN
+ rs = reconcile(rs.deployment);
+
+ // GREEN deployment should be created
+ var flinkDeployments = getFlinkDeployments();
+ assertEquals(2, flinkDeployments.size());
+
+ // === SUSPEND DURING TRANSITION ===
+
rs.deployment.getSpec().getTemplate().getSpec().getJob().setState(JobState.SUSPENDED);
+ kubernetesClient.resource(rs.deployment).createOrReplace();
+
+ // Reconcile - suspend should be BLOCKED, transition continues
+ rs = reconcile(rs.deployment);
+
+ // GREEN deployment should NOT be suspended yet
+ flinkDeployments = getFlinkDeployments();
+ var greenDeployment =
+ flinkDeployments.stream()
+ .filter(fd ->
fd.getMetadata().getName().endsWith("-green"))
+ .findFirst()
+ .orElseThrow();
+ assertEquals(JobState.RUNNING,
greenDeployment.getSpec().getJob().getState());
+
+ // === TRANSITION COMPLETES ===
+ // Simulate GREEN becoming ready
+ simulateSuccessfulJobStart(greenDeployment);
+
+ // Reconcile to complete transition and process pending suspend
+ // (sets timestamp, deletes BLUE, finalizes to ACTIVE_GREEN, then
processes suspend
+ // via TRANSITIONING_TO_GREEN)
+ for (int i = 0; i < 5; i++) {
+ rs = reconcile(rs.deployment);
+ }
+ // Persist the BG status update to kubernetes for the next reconcile
+ kubernetesClient.resource(rs.deployment).updateStatus();
+
+ // GREEN should now be suspended in spec (via patchFlinkDeployment)
+ flinkDeployments = getFlinkDeployments();
+ greenDeployment =
+ flinkDeployments.stream()
+ .filter(fd ->
fd.getMetadata().getName().endsWith("-green"))
+ .findFirst()
+ .orElseThrow();
+ assertEquals(JobState.SUSPENDED,
greenDeployment.getSpec().getJob().getState());
+
+ // Simulate GREEN child becoming suspended (lifecycleState is computed
from
+ // lastReconciledSpec)
+ simulateSuccessfulSuspend(greenDeployment);
+
+ // Reconcile - finalizeSuspendedDeployment should set BG job status to
SUSPENDED
+ rs = reconcile(rs.deployment);
+ assertEquals(JobStatus.SUSPENDED,
rs.reconciledStatus.getJobStatus().getState());
+ assertEquals(
+ FlinkBlueGreenDeploymentState.ACTIVE_GREEN,
+ rs.reconciledStatus.getBlueGreenState());
+ }
+
@ParameterizedTest
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
public void verifyFailureBeforeTransition(FlinkVersion flinkVersion)
throws Exception {
@@ -654,6 +816,76 @@ public class FlinkBlueGreenDeploymentControllerTest {
assertEquals(1, flinkDeployments.size());
}
+ @ParameterizedTest
+
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
+ public void
verifySuspendWhenChildNotReadyPatchesWithoutFailing(FlinkVersion flinkVersion)
+ throws Exception {
+ var rs = setupActiveBlueDeployment(flinkVersion);
+
+ // Mark the active child as not ready
+ var child = getFlinkDeployments().get(0);
+ child.getStatus().getJobStatus().setState(JobStatus.RECONCILING);
+ kubernetesClient.resource(child).update();
+
+ // Suspend and bump parallelism
+ var deployment = rs.deployment;
+
deployment.getSpec().getTemplate().getSpec().getJob().setState(JobState.SUSPENDED);
+
deployment.getSpec().getTemplate().getSpec().getJob().setParallelism(9);
+ kubernetesClient.resource(deployment).createOrReplace();
+
+ rs = reconcile(deployment);
+
+ var flinkDeployments = getFlinkDeployments();
+ assertEquals(1, flinkDeployments.size(), "Should not create new
child");
+ child = flinkDeployments.get(0);
+ assertEquals(JobState.SUSPENDED, child.getSpec().getJob().getState());
+ assertEquals(9, child.getSpec().getJob().getParallelism(), "Spec
change should apply");
+ assertNotEquals(
+ JobStatus.FAILING,
+ rs.reconciledStatus.getJobStatus().getState(),
+ "Should not mark parent failing when suspending not-ready
child");
+ }
+
+ @ParameterizedTest
+
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
+ public void
verifyInitialSuspendedIsBlockedThenDeploysOnRunning(FlinkVersion flinkVersion)
+ throws Exception {
+ var bg =
+ buildSessionCluster(
+ TEST_DEPLOYMENT_NAME,
+ TEST_NAMESPACE,
+ flinkVersion,
+ null,
+ UpgradeMode.STATELESS);
+
bg.getSpec().getTemplate().getSpec().getJob().setState(JobState.SUSPENDED);
+ kubernetesClient.resource(bg).createOrReplace();
+
+ // First reconcile initializes status to INITIALIZING_BLUE
+ var rs = reconcile(bg);
+ // Second reconcile: handler detects SUSPENDED and blocks
+ rs = reconcile(rs.deployment);
+
+ // Block child creation when initial state is SUSPENDED
+ assertEquals(0, getFlinkDeployments().size(), "No child should be
created");
+ assertEquals(
+ JobStatus.SUSPENDED,
+ rs.reconciledStatus.getJobStatus().getState(),
+ "Job status should be SUSPENDED when initial deployment is
blocked");
+
+ // Flip to RUNNING and reconcile again
+ bg = rs.deployment;
+
bg.getSpec().getTemplate().getSpec().getJob().setState(JobState.RUNNING);
+ kubernetesClient.resource(bg).createOrReplace();
+
+ rs = reconcile(bg);
+
+ assertEquals(1, getFlinkDeployments().size(), "Child should be created
after fix");
+ assertEquals(
+ JobStatus.RECONCILING,
+ rs.reconciledStatus.getJobStatus().getState(),
+ "Job status should be RECONCILING after deployment initiated");
+ }
+
// ==================== Parameterized Test Inputs ====================
static Stream<Arguments> savepointErrorProvider() {
@@ -1199,6 +1431,16 @@ public class FlinkBlueGreenDeploymentControllerTest {
kubernetesClient.resource(deployment).update();
}
+ private void simulateSuccessfulSuspend(FlinkDeployment deployment) {
+ deployment.getStatus().getJobStatus().setState(JobStatus.FINISHED);
+
deployment.getStatus().getReconciliationStatus().setState(ReconciliationState.DEPLOYED);
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .serializeAndSetLastReconciledSpec(deployment.getSpec(),
deployment);
+ kubernetesClient.resource(deployment).update();
+ }
+
private void simulateJobFailure(FlinkDeployment deployment) {
deployment.getStatus().getJobStatus().setState(JobStatus.RECONCILING);
deployment.getStatus().getReconciliationStatus().setState(ReconciliationState.UPGRADING);
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiffTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiffTest.java
index 272730a0..1e60b83f 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiffTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/diff/FlinkBlueGreenDeploymentSpecDiffTest.java
@@ -134,6 +134,30 @@ public class FlinkBlueGreenDeploymentSpecDiffTest {
assertEquals(BlueGreenDiffType.IGNORE, diff.compare());
}
+ @Test
+ public void testSuspendOnJobStateChange() {
+ FlinkBlueGreenDeploymentSpec spec1 = createBasicSpec(); // RUNNING
default
+ FlinkBlueGreenDeploymentSpec spec2 = createBasicSpec();
+ spec2.getTemplate().getSpec().getJob().setState(JobState.SUSPENDED);
+
+ FlinkBlueGreenDeploymentSpecDiff diff =
+ new FlinkBlueGreenDeploymentSpecDiff(DEPLOYMENT_MODE, spec1,
spec2);
+
+ assertEquals(BlueGreenDiffType.SUSPEND, diff.compare());
+ }
+
+ @Test
+ public void testResumeOnJobStateChange() {
+ FlinkBlueGreenDeploymentSpec spec1 = createBasicSpec();
+ spec1.getTemplate().getSpec().getJob().setState(JobState.SUSPENDED);
+ FlinkBlueGreenDeploymentSpec spec2 = createBasicSpec(); // RUNNING
default
+
+ FlinkBlueGreenDeploymentSpecDiff diff =
+ new FlinkBlueGreenDeploymentSpecDiff(DEPLOYMENT_MODE, spec1,
spec2);
+
+ assertEquals(BlueGreenDiffType.RESUME, diff.compare());
+ }
+
@Test
public void testIgnoreForRootPodTemplateAdditionalProps() {
FlinkBlueGreenDeploymentSpec spec1 = createBasicSpec();