This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new cc2a9517 [FLINK-39618] FlinkDeployment deletion deadlocks when 
FlinkSessionJobs are running with default block-on-* options
cc2a9517 is described below

commit cc2a9517ddb3477b471ae1cfcd525aff42e10114
Author: Dennis-Mircea Ciupitu <[email protected]>
AuthorDate: Wed May 6 21:41:43 2026 +0300

    [FLINK-39618] FlinkDeployment deletion deadlocks when FlinkSessionJobs are 
running with default block-on-* options
---
 .../sessionjob/SessionJobReconciler.java           | 18 +++++--
 .../flink/kubernetes/operator/TestUtils.java       | 18 +++++++
 .../sessionjob/SessionJobReconcilerTest.java       | 58 ++++++++++++++++++++++
 3 files changed, 89 insertions(+), 5 deletions(-)

diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
index 974c366b..4491ab97 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconciler.java
@@ -158,12 +158,20 @@ public class SessionJobReconciler
         var flinkDep = flinkDepOptional.get();
 
         // If the session cluster is being deleted, the job will not survive 
regardless,
-        // so there is no need to explicitly cancel it.
+        // so there is no need to explicitly cancel it, unless 
BLOCK_ON_SESSION_JOBS is
+        // enabled.
         var sessionLifecycleState = flinkDep.getStatus().getLifecycleState();
         if (sessionLifecycleState == ResourceLifecycleState.DELETING
                 || sessionLifecycleState == ResourceLifecycleState.DELETED) {
-            LOG.info("Session cluster is being deleted, skipping job 
cancellation");
-            return DeleteControl.defaultDelete();
+            var observeConfig = ctx.getObserveConfig();
+            var blockOnSessionJobs =
+                    observeConfig != null
+                            && observeConfig.get(
+                                    
KubernetesOperatorConfigOptions.BLOCK_ON_SESSION_JOBS);
+            if (!blockOnSessionJobs) {
+                LOG.info("Session cluster is being deleted, skipping job 
cancellation");
+                return DeleteControl.defaultDelete();
+            }
         }
 
         if (!sessionClusterReady(flinkDepOptional)) {
@@ -189,9 +197,9 @@ public class SessionJobReconciler
         }
 
         try {
-            var observeConfig = ctx.getObserveConfig();
             var suspendMode =
-                    
observeConfig.getBoolean(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION)
+                    ctx.getObserveConfig()
+                                    
.get(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION)
                             ? SuspendMode.SAVEPOINT
                             : SuspendMode.STATELESS;
             if (cancelJob(ctx, suspendMode)) {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index e80f2eba..83f8f2cc 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -266,6 +266,24 @@ public class TestUtils extends BaseTestUtils {
         };
     }
 
+    public static <T extends HasMetadata>
+            Context<T> createContextWithReadyFlinkDeploymentInLifecycleState(
+                    ResourceLifecycleState lifecycleState, Map<String, String> 
flinkDepConfig) {
+        return new TestingContext<>() {
+            @Override
+            public Optional<T> getSecondaryResource(Class expectedType, String 
eventSourceName) {
+                var session = buildSessionCluster();
+                session.getStatus().setLifecycleState(lifecycleState);
+                
session.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
+                
session.getSpec().getFlinkConfiguration().putAllFrom(flinkDepConfig);
+                session.getStatus()
+                        .getReconciliationStatus()
+                        .serializeAndSetLastReconciledSpec(session.getSpec(), 
session);
+                return (Optional<T>) Optional.of(session);
+            }
+        };
+    }
+
     public static <T extends HasMetadata> Context<T> 
createContextWithUnhealthyFlinkDeployment(
             boolean haEnabled) {
         return new TestingContext<>() {
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
index 186de7eb..3416d474 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/sessionjob/SessionJobReconcilerTest.java
@@ -48,6 +48,7 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
@@ -907,6 +908,63 @@ public class SessionJobReconcilerTest extends 
OperatorTestBase {
         assertEquals(RUNNING, flinkService.listJobs().get(0).f1.getJobState());
     }
 
+    @ParameterizedTest
+    @EnumSource(
+            value = ResourceLifecycleState.class,
+            names = {"DELETING", "DELETED"})
+    public void testCleanupWithDeletingClusterBlockOnSessionJobsDisabled(
+            ResourceLifecycleState lifecycleState) throws Exception {
+        FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
+
+        reconciler.reconcile(
+                sessionJob, 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
+        assertEquals(1, flinkService.listJobs().size());
+        verifyAndSetRunningJobsToStatus(
+                sessionJob, JobState.RUNNING, RECONCILING, null, 
flinkService.listJobs());
+
+        var ctx =
+                
TestUtils.createContextWithReadyFlinkDeploymentInLifecycleState(
+                        lifecycleState,
+                        Map.of(
+                                
KubernetesOperatorConfigOptions.BLOCK_ON_SESSION_JOBS.key(),
+                                "false"));
+        var deleteControl = reconciler.cleanup(sessionJob, ctx);
+
+        assertTrue(deleteControl.isRemoveFinalizer());
+        // Bypass fired: no cancellation attempted, job still RUNNING.
+        assertEquals(RUNNING, flinkService.listJobs().get(0).f1.getJobState());
+    }
+
+    @ParameterizedTest
+    @EnumSource(
+            value = ResourceLifecycleState.class,
+            names = {"DELETING", "DELETED"})
+    public void testCleanupWithDeletingClusterBlockOnSessionJobsEnabled(
+            ResourceLifecycleState lifecycleState) throws Exception {
+        FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
+
+        reconciler.reconcile(
+                sessionJob, 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient));
+        assertEquals(1, flinkService.listJobs().size());
+        verifyAndSetRunningJobsToStatus(
+                sessionJob, JobState.RUNNING, RECONCILING, null, 
flinkService.listJobs());
+
+        var ctx =
+                
TestUtils.createContextWithReadyFlinkDeploymentInLifecycleState(
+                        lifecycleState,
+                        Map.of(
+                                
KubernetesOperatorConfigOptions.BLOCK_ON_SESSION_JOBS.key(),
+                                "true"));
+        var deleteControl = reconciler.cleanup(sessionJob, ctx);
+
+        // Cancellation was initiated: cancelJobOrError returns pending so the 
reconciler
+        // reschedules and holds the finalizer until the cancel is 
re-observed. The job has
+        // already transitioned to CANCELED on the cluster side 
(TestingFlinkService).
+        assertFalse(deleteControl.isRemoveFinalizer());
+        assertEquals(10_000L, deleteControl.getScheduleDelay().orElse(null));
+        assertEquals(CANCELED, 
flinkService.listJobs().get(0).f1.getJobState());
+    }
+
     @Test
     public void testCleanupWithUnhealthySessionClusterNoHa() throws Exception {
         FlinkSessionJob sessionJob = TestUtils.buildSessionJob();

Reply via email to