This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new ef02fa8e [FLINK-28648] Allow session deletion to block on any running 
job
ef02fa8e is described below

commit ef02fa8edc942d679ada750f143f389d66c15e6b
Author: nishita-09 <74376090+nishita...@users.noreply.github.com>
AuthorDate: Mon Jul 21 17:51:36 2025 +0530

    [FLINK-28648] Allow session deletion to block on any running job
---
 .../shortcodes/generated/dynamic_section.html      |   6 +
 .../kubernetes_operator_config_configuration.html  |   6 +
 .../config/KubernetesOperatorConfigOptions.java    |   8 +
 .../reconciler/deployment/SessionReconciler.java   |  78 +++++++++-
 .../deployment/SessionReconcilerTest.java          | 165 +++++++++++++++++++++
 5 files changed, 256 insertions(+), 7 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/dynamic_section.html 
b/docs/layouts/shortcodes/generated/dynamic_section.html
index 54c1eb69..f81afa4f 100644
--- a/docs/layouts/shortcodes/generated/dynamic_section.html
+++ b/docs/layouts/shortcodes/generated/dynamic_section.html
@@ -194,6 +194,12 @@
             <td>Duration</td>
             <td>The interval before a savepoint trigger attempt is marked as 
unsuccessful.</td>
         </tr>
+        <tr>
+            
<td><h5>kubernetes.operator.session.block-on-unmanaged-jobs</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Block FlinkDeployment deletion if unmanaged jobs (jobs not 
managed by FlinkSessionJob resources) are running in the session cluster. 
Example: Jobs submitted via CLI.</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.snapshot.resource.enabled</h5></td>
             <td style="word-wrap: break-word;">true</td>
diff --git 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index 4a72d8b7..2ba0de9e 100644
--- 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -416,6 +416,12 @@
             <td>Duration</td>
             <td>The interval before a savepoint trigger attempt is marked as 
unsuccessful.</td>
         </tr>
+        <tr>
+            
<td><h5>kubernetes.operator.session.block-on-unmanaged-jobs</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Block FlinkDeployment deletion if unmanaged jobs (jobs not 
managed by FlinkSessionJob resources) are running in the session cluster. 
Example: Jobs submitted via CLI.</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.snapshot.resource.enabled</h5></td>
             <td style="word-wrap: break-word;">true</td>
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index 6f5891cc..bd5e0a46 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -647,6 +647,14 @@ public class KubernetesOperatorConfigOptions {
                     .withDescription(
                             "Indicate whether the job should be drained when 
stopping with savepoint.");
 
+    @Documentation.Section(SECTION_DYNAMIC)
+    public static final ConfigOption<Boolean> BLOCK_ON_UNMANAGED_JOBS =
+            operatorConfig("session.block-on-unmanaged-jobs")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Block FlinkDeployment deletion if unmanaged jobs 
(jobs not managed by FlinkSessionJob resources) are running in the session 
cluster. Example: Jobs submitted via CLI.");
+
     @Documentation.Section(SECTION_ADVANCED)
     public static final ConfigOption<Duration> REFRESH_CLUSTER_RESOURCE_VIEW =
             operatorConfig("cluster.resource-view.refresh-interval")
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
index c628bf94..809fbfbd 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.kubernetes.operator.reconciler.deployment;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.autoscaler.NoopJobAutoscaler;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
@@ -25,11 +27,16 @@ import 
org.apache.flink.kubernetes.operator.api.diff.DiffType;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
+import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.IngressUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
 
 import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
 import org.slf4j.Logger;
@@ -120,6 +127,37 @@ public class SessionReconciler
                 
.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
     }
 
+    // Detects jobs which are not in globally terminated states
+    @VisibleForTesting
+    Set<JobID> getNonTerminalJobs(FlinkResourceContext<FlinkDeployment> ctx) {
+        LOG.debug("Starting nonTerminal jobs detection for session cluster");
+        try {
+            // Get all jobs running in the Flink cluster
+            var flinkService = ctx.getFlinkService();
+            var clusterClient = 
flinkService.getClusterClient(ctx.getObserveConfig());
+            var allJobs =
+                    clusterClient
+                            .sendRequest(
+                                    JobsOverviewHeaders.getInstance(),
+                                    EmptyMessageParameters.getInstance(),
+                                    EmptyRequestBody.getInstance())
+                            .get()
+                            .getJobs();
+
+            // running job Ids
+            Set<JobID> nonTerminalJobIds =
+                    allJobs.stream()
+                            .filter(job -> 
!job.getStatus().isGloballyTerminalState())
+                            .map(JobDetails::getJobId)
+                            .collect(Collectors.toSet());
+
+            return nonTerminalJobIds;
+        } catch (Exception e) {
+            LOG.warn("Failed to detect nonTerminal jobs in session cluster", 
e);
+            return Set.of();
+        }
+    }
+
     @Override
     public DeleteControl cleanupInternal(FlinkResourceContext<FlinkDeployment> 
ctx) {
         Set<FlinkSessionJob> sessionJobs =
@@ -143,13 +181,39 @@ public class SessionReconciler
             }
             return DeleteControl.noFinalizerRemoval()
                     
.rescheduleAfter(ctx.getOperatorConfig().getReconcileInterval().toMillis());
-        } else {
-            LOG.info("Stopping session cluster");
-            var conf = ctx.getDeployConfig(ctx.getResource().getSpec());
-            ctx.getFlinkService()
-                    .deleteClusterDeployment(
-                            deployment.getMetadata(), deployment.getStatus(), 
conf, true);
-            return DeleteControl.defaultDelete();
         }
+
+        // Check for non-terminated jobs if the option is enabled (Enabled by 
default) , after
+        // sessionJobs are deleted
+        boolean blockOnUnmanagedJobs =
+                ctx.getObserveConfig()
+                        
.getBoolean(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS);
+        if (blockOnUnmanagedJobs) {
+            Set<JobID> nonTerminalJobs = getNonTerminalJobs(ctx);
+            if (!nonTerminalJobs.isEmpty()) {
+                var error =
+                        String.format(
+                                "The session cluster has non terminated jobs 
%s that should be cancelled first",
+                                nonTerminalJobs.stream()
+                                        .map(JobID::toHexString)
+                                        .collect(Collectors.toList()));
+                eventRecorder.triggerEvent(
+                        deployment,
+                        EventRecorder.Type.Warning,
+                        EventRecorder.Reason.CleanupFailed,
+                        EventRecorder.Component.Operator,
+                        error,
+                        ctx.getKubernetesClient());
+                return DeleteControl.noFinalizerRemoval()
+                        
.rescheduleAfter(ctx.getOperatorConfig().getReconcileInterval().toMillis());
+            }
+        }
+
+        LOG.info("Stopping session cluster");
+        var conf = ctx.getObserveConfig();
+        ctx.getFlinkService()
+                .deleteClusterDeployment(
+                        deployment.getMetadata(), deployment.getStatus(), 
conf, true);
+        return DeleteControl.defaultDelete();
     }
 }
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
index 54484065..9f84f2c1 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java
@@ -17,17 +17,23 @@
 
 package org.apache.flink.kubernetes.operator.reconciler.deployment;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.OperatorTestBase;
 import org.apache.flink.kubernetes.operator.TestUtils;
 import org.apache.flink.kubernetes.operator.TestingFlinkService;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
 import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
+import 
org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
 import org.apache.flink.kubernetes.operator.reconciler.TestReconcilerAdapter;
+import org.apache.flink.runtime.client.JobStatusMessage;
 
 import io.fabric8.kubernetes.api.model.ObjectMeta;
 import io.fabric8.kubernetes.client.KubernetesClient;
@@ -36,6 +42,7 @@ import lombok.Getter;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -43,6 +50,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
@@ -139,4 +147,161 @@ public class SessionReconcilerTest extends 
OperatorTestBase {
                 
deployConfig.get(KubernetesConfigOptions.JOB_MANAGER_OWNER_REFERENCE);
         Assertions.assertEquals(expectedOwnerReferences, or);
     }
+
+    @Test
+    public void testGetNonTerminalJobs() throws Exception {
+        FlinkDeployment deployment = TestUtils.buildSessionCluster();
+        deployment
+                .getSpec()
+                .getFlinkConfiguration()
+                
.put(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS.key(), "true");
+
+        assertEquals(
+                "true",
+                deployment
+                        .getSpec()
+                        .getFlinkConfiguration()
+                        
.get(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS.key()));
+
+        reconciler.reconcile(deployment, flinkService.getContext());
+
+        // Verify deployment is in DEPLOYED state
+        assertEquals(
+                ReconciliationState.DEPLOYED,
+                deployment.getStatus().getReconciliationStatus().getState());
+
+        // Create different types of jobs
+        JobID managedJobId1 = new JobID();
+        JobID managedJobId2 = new JobID();
+        JobID unmanagedRunningJobId1 = new JobID();
+        JobID unmanagedTerminatedJobId = new JobID();
+        JobID unmanagedRunningJobId2 = new JobID();
+
+        // Add jobs to the testing service
+        flinkService
+                .listJobs()
+                .add(
+                        Tuple3.of(
+                                null,
+                                new JobStatusMessage(
+                                        managedJobId1,
+                                        "managed-job-1",
+                                        JobStatus.RUNNING,
+                                        System.currentTimeMillis()),
+                                new Configuration()));
+        flinkService
+                .listJobs()
+                .add(
+                        Tuple3.of(
+                                null,
+                                new JobStatusMessage(
+                                        managedJobId2,
+                                        "managed-job-2",
+                                        JobStatus.RUNNING,
+                                        System.currentTimeMillis()),
+                                new Configuration()));
+        flinkService
+                .listJobs()
+                .add(
+                        Tuple3.of(
+                                null,
+                                new JobStatusMessage(
+                                        unmanagedRunningJobId1,
+                                        "unmanaged-running-job-1",
+                                        JobStatus.RUNNING,
+                                        System.currentTimeMillis()),
+                                new Configuration()));
+        flinkService
+                .listJobs()
+                .add(
+                        Tuple3.of(
+                                null,
+                                new JobStatusMessage(
+                                        unmanagedTerminatedJobId,
+                                        "unmanaged-terminated-job",
+                                        JobStatus.CANCELED,
+                                        System.currentTimeMillis()),
+                                new Configuration()));
+        flinkService
+                .listJobs()
+                .add(
+                        Tuple3.of(
+                                null,
+                                new JobStatusMessage(
+                                        unmanagedRunningJobId2,
+                                        "unmanaged-running-job-2",
+                                        JobStatus.RUNNING,
+                                        System.currentTimeMillis()),
+                                new Configuration()));
+
+        // Create FlinkSessionJob resources for the managed jobs
+        FlinkSessionJob managedSessionJob1 = TestUtils.buildSessionJob();
+        managedSessionJob1.getMetadata().setName("managed-session-job-1");
+        
managedSessionJob1.getStatus().getJobStatus().setJobId(managedJobId1.toHexString());
+        kubernetesClient.resource(managedSessionJob1).createOrReplace();
+
+        FlinkSessionJob managedSessionJob2 = TestUtils.buildSessionJob();
+        managedSessionJob2.getMetadata().setName("managed-session-job-2");
+        
managedSessionJob2.getStatus().getJobStatus().setJobId(managedJobId2.toHexString());
+        kubernetesClient.resource(managedSessionJob2).createOrReplace();
+
+        Set<FlinkSessionJob> sessionJobs = new HashSet<>();
+        sessionJobs.add(managedSessionJob1);
+        sessionJobs.add(managedSessionJob2);
+
+        // Test with blocking enabled - should identify all non-terminal jobs
+        var context = 
TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient);
+        var resourceContext = getResourceContext(deployment, context);
+
+        var sessionReconciler = (SessionReconciler) reconciler.getReconciler();
+        Set<JobID> nonTerminalJobs = 
sessionReconciler.getNonTerminalJobs(resourceContext);
+
+        // Verify all non-terminal jobs are identified - should be 4 (2 
managed + 2 unmanaged
+        // running)
+        assertEquals(4, nonTerminalJobs.size(), "Should identify exactly 4 
non-terminal jobs");
+
+        assertTrue(
+                nonTerminalJobs.contains(unmanagedRunningJobId1),
+                "Should contain unmanagedRunningJobId1");
+        assertTrue(
+                nonTerminalJobs.contains(unmanagedRunningJobId2),
+                "Should contain unmanagedRunningJobId2");
+
+        // Verify terminated job is not included
+        assertFalse(
+                nonTerminalJobs.contains(unmanagedTerminatedJobId),
+                "Should not contain terminated job");
+
+        // Test scenario with only unmanaged jobs
+        flinkService
+                .listJobs()
+                .removeIf(
+                        job ->
+                                job.f1.getJobId().equals(managedJobId1)
+                                        || 
job.f1.getJobId().equals(managedJobId2));
+
+        Set<JobID> nonTerminalJobsAfterSessionJobsRemoval =
+                sessionReconciler.getNonTerminalJobs(resourceContext);
+
+        assertEquals(
+                2,
+                nonTerminalJobsAfterSessionJobsRemoval.size(),
+                "Should have 2 non-terminal jobs when sessionjobs are 
deleted");
+
+        // Test scenario with no running jobs
+        flinkService
+                .listJobs()
+                .removeIf(
+                        job ->
+                                
job.f1.getJobId().equals(unmanagedRunningJobId1)
+                                        || 
job.f1.getJobId().equals(unmanagedRunningJobId2));
+
+        Set<JobID> nonTerminalJobsAfterRemoval =
+                sessionReconciler.getNonTerminalJobs(resourceContext);
+
+        assertEquals(
+                0,
+                nonTerminalJobsAfterRemoval.size(),
+                "Should have no non-terminal jobs when only terminated jobs 
exist");
+    }
 }

Reply via email to