This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 42737fc6 [FLINK-39618] FlinkDeployment deletion deadlocks when
FlinkSessionJobs are running with default block-on-* options
42737fc6 is described below
commit 42737fc60248e9337fea4f5888355bdbcba13c26
Author: Dennis-Mircea Ciupitu <[email protected]>
AuthorDate: Wed May 6 21:41:43 2026 +0300
[FLINK-39618] FlinkDeployment deletion deadlocks when FlinkSessionJobs are
running with default block-on-* options
---
.../sessionjob/SessionJobReconciler.java | 18 +++++--
.../flink/kubernetes/operator/TestUtils.java | 18 +++++++
.../sessionjob/SessionJobReconcilerTest.java | 58 ++++++++++++++++++++++
3 files changed, 89 insertions(+), 5 deletions(-)
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
index 974c366b..4491ab97 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
@@ -158,12 +158,20 @@ public class SessionJobReconciler
var flinkDep = flinkDepOptional.get();
// If the session cluster is being deleted, the job will not survive
regardless,
- // so there is no need to explicitly cancel it.
+ // so there is no need to explicitly cancel it, unless
BLOCK_ON_SESSION_JOBS is
+ // enabled.
var sessionLifecycleState = flinkDep.getStatus().getLifecycleState();
if (sessionLifecycleState == ResourceLifecycleState.DELETING
|| sessionLifecycleState == ResourceLifecycleState.DELETED) {
- LOG.info("Session cluster is being deleted, skipping job
cancellation");
- return DeleteControl.defaultDelete();
+ var observeConfig = ctx.getObserveConfig();
+ var blockOnSessionJobs =
+ observeConfig != null
+ && observeConfig.get(
+
KubernetesOperatorConfigOptions.BLOCK_ON_SESSION_JOBS);
+ if (!blockOnSessionJobs) {
+ LOG.info("Session cluster is being deleted, skipping job
cancellation");
+ return DeleteControl.defaultDelete();
+ }
}
if (!sessionClusterReady(flinkDepOptional)) {
@@ -189,9 +197,9 @@ public class SessionJobReconciler
}
try {
- var observeConfig = ctx.getObserveConfig();
var suspendMode =
-
observeConfig.getBoolean(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION)
+ ctx.getObserveConfig()
+
.get(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION)
? SuspendMode.SAVEPOINT
: SuspendMode.STATELESS;
if (cancelJob(ctx, suspendMode)) {
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index e80f2eba..83f8f2cc 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -266,6 +266,24 @@ public class TestUtils extends BaseTestUtils {
};
}
+ public static <T extends HasMetadata>
+ Context<T> createContextWithReadyFlinkDeploymentInLifecycleState(
+ ResourceLifecycleState lifecycleState, Map<String, String>
flinkDepConfig) {
+ return new TestingContext<>() {
+ @Override
+ public Optional<T> getSecondaryResource(Class expectedType, String
eventSourceName) {
+ var session = buildSessionCluster();
+ session.getStatus().setLifecycleState(lifecycleState);
+
session.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+
session.getSpec().getFlinkConfiguration().putAllFrom(flinkDepConfig);
+ session.getStatus()
+ .getReconciliationStatus()
+ .serializeAndSetLastReconciledSpec(session.getSpec(),
session);
+ return (Optional<T>) Optional.of(session);
+ }
+ };
+ }
+
public static <T extends HasMetadata> Context<T>
createContextWithUnhealthyFlinkDeployment(
boolean haEnabled) {
return new TestingContext<>() {
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
index 186de7eb..3416d474 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
@@ -48,6 +48,7 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
@@ -907,6 +908,63 @@ public class SessionJobReconcilerTest extends
OperatorTestBase {
assertEquals(RUNNING, flinkService.listJobs().get(0).f1.getJobState());
}
+ @ParameterizedTest
+ @EnumSource(
+ value = ResourceLifecycleState.class,
+ names = {"DELETING", "DELETED"})
+ public void testCleanupWithDeletingClusterBlockOnSessionJobsDisabled(
+ ResourceLifecycleState lifecycleState) throws Exception {
+ FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
+
+ reconciler.reconcile(
+ sessionJob,
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
+ assertEquals(1, flinkService.listJobs().size());
+ verifyAndSetRunningJobsToStatus(
+ sessionJob, JobState.RUNNING, RECONCILING, null,
flinkService.listJobs());
+
+ var ctx =
+
TestUtils.createContextWithReadyFlinkDeploymentInLifecycleState(
+ lifecycleState,
+ Map.of(
+
KubernetesOperatorConfigOptions.BLOCK_ON_SESSION_JOBS.key(),
+ "false"));
+ var deleteControl = reconciler.cleanup(sessionJob, ctx);
+
+ assertTrue(deleteControl.isRemoveFinalizer());
+ // Bypass fired: no cancellation attempted, job still RUNNING.
+ assertEquals(RUNNING, flinkService.listJobs().get(0).f1.getJobState());
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = ResourceLifecycleState.class,
+ names = {"DELETING", "DELETED"})
+ public void testCleanupWithDeletingClusterBlockOnSessionJobsEnabled(
+ ResourceLifecycleState lifecycleState) throws Exception {
+ FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
+
+ reconciler.reconcile(
+ sessionJob,
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
+ assertEquals(1, flinkService.listJobs().size());
+ verifyAndSetRunningJobsToStatus(
+ sessionJob, JobState.RUNNING, RECONCILING, null,
flinkService.listJobs());
+
+ var ctx =
+
TestUtils.createContextWithReadyFlinkDeploymentInLifecycleState(
+ lifecycleState,
+ Map.of(
+
KubernetesOperatorConfigOptions.BLOCK_ON_SESSION_JOBS.key(),
+ "true"));
+ var deleteControl = reconciler.cleanup(sessionJob, ctx);
+
+ // Cancellation was initiated: cancelJobOrError returns pending so the
reconciler
+ // reschedules and holds the finalizer until the cancel is
re-observed. The job has
+ // already transitioned to CANCELED on the cluster side
(TestingFlinkService).
+ assertFalse(deleteControl.isRemoveFinalizer());
+ assertEquals(10_000L, deleteControl.getScheduleDelay().orElse(null));
+ assertEquals(CANCELED,
flinkService.listJobs().get(0).f1.getJobState());
+ }
+
@Test
public void testCleanupWithUnhealthySessionClusterNoHa() throws Exception {
FlinkSessionJob sessionJob = TestUtils.buildSessionJob();