This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new ef02fa8e [FLINK-28648] Allow session deletion to block on any running job ef02fa8e is described below commit ef02fa8edc942d679ada750f143f389d66c15e6b Author: nishita-09 <74376090+nishita...@users.noreply.github.com> AuthorDate: Mon Jul 21 17:51:36 2025 +0530 [FLINK-28648] Allow session deletion to block on any running job --- .../shortcodes/generated/dynamic_section.html | 6 + .../kubernetes_operator_config_configuration.html | 6 + .../config/KubernetesOperatorConfigOptions.java | 8 + .../reconciler/deployment/SessionReconciler.java | 78 +++++++++- .../deployment/SessionReconcilerTest.java | 165 +++++++++++++++++++++ 5 files changed, 256 insertions(+), 7 deletions(-) diff --git a/docs/layouts/shortcodes/generated/dynamic_section.html b/docs/layouts/shortcodes/generated/dynamic_section.html index 54c1eb69..f81afa4f 100644 --- a/docs/layouts/shortcodes/generated/dynamic_section.html +++ b/docs/layouts/shortcodes/generated/dynamic_section.html @@ -194,6 +194,12 @@ <td>Duration</td> <td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td> </tr> + <tr> + <td><h5>kubernetes.operator.session.block-on-unmanaged-jobs</h5></td> + <td style="word-wrap: break-word;">true</td> + <td>Boolean</td> + <td>Block FlinkDeployment deletion if unmanaged jobs (jobs not managed by FlinkSessionJob resources) are running in the session cluster. Example: Jobs submitted via CLI.</td> + </tr> <tr> <td><h5>kubernetes.operator.snapshot.resource.enabled</h5></td> <td style="word-wrap: break-word;">true</td> diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html index 4a72d8b7..2ba0de9e 100644 --- a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html +++ b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html @@ -416,6 +416,12 @@ <td>Duration</td> <td>The interval before a savepoint trigger attempt is marked as unsuccessful.</td> </tr> + <tr> + <td><h5>kubernetes.operator.session.block-on-unmanaged-jobs</h5></td> + <td style="word-wrap: break-word;">true</td> + <td>Boolean</td> + <td>Block FlinkDeployment deletion if unmanaged jobs (jobs not managed by FlinkSessionJob resources) are running in the session cluster. Example: Jobs submitted via CLI.</td> + </tr> <tr> <td><h5>kubernetes.operator.snapshot.resource.enabled</h5></td> <td style="word-wrap: break-word;">true</td> diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java index 6f5891cc..bd5e0a46 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java @@ -647,6 +647,14 @@ public class KubernetesOperatorConfigOptions { .withDescription( "Indicate whether the job should be drained when stopping with savepoint."); + @Documentation.Section(SECTION_DYNAMIC) + public static final ConfigOption<Boolean> BLOCK_ON_UNMANAGED_JOBS = + operatorConfig("session.block-on-unmanaged-jobs") + .booleanType() + .defaultValue(true) + .withDescription( + "Block FlinkDeployment deletion if unmanaged jobs (jobs not managed by FlinkSessionJob resources) are running in the session cluster. Example: Jobs submitted via CLI."); + @Documentation.Section(SECTION_ADVANCED) public static final ConfigOption<Duration> REFRESH_CLUSTER_RESOURCE_VIEW = operatorConfig("cluster.resource-view.refresh-interval") diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java index c628bf94..809fbfbd 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java @@ -17,6 +17,8 @@ package org.apache.flink.kubernetes.operator.reconciler.deployment; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; import org.apache.flink.autoscaler.NoopJobAutoscaler; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; @@ -25,11 +27,16 @@ import org.apache.flink.kubernetes.operator.api.diff.DiffType; import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; +import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.kubernetes.operator.utils.IngressUtils; import org.apache.flink.kubernetes.operator.utils.StatusRecorder; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders; import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; import org.slf4j.Logger; @@ -120,6 +127,37 @@ public class SessionReconciler .setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING); } + // Detects jobs which are not in globally terminated states + @VisibleForTesting + Set<JobID> getNonTerminalJobs(FlinkResourceContext<FlinkDeployment> ctx) { + LOG.debug("Starting nonTerminal jobs detection for session cluster"); + try { + // Get all jobs running in the Flink cluster + var flinkService = ctx.getFlinkService(); + var clusterClient = flinkService.getClusterClient(ctx.getObserveConfig()); + var allJobs = + clusterClient + .sendRequest( + JobsOverviewHeaders.getInstance(), + EmptyMessageParameters.getInstance(), + EmptyRequestBody.getInstance()) + .get() + .getJobs(); + + // running job Ids + Set<JobID> nonTerminalJobIds = + allJobs.stream() + .filter(job -> !job.getStatus().isGloballyTerminalState()) + .map(JobDetails::getJobId) + .collect(Collectors.toSet()); + + return nonTerminalJobIds; + } catch (Exception e) { + LOG.warn("Failed to detect nonTerminal jobs in session cluster", e); + return Set.of(); + } + } + @Override public DeleteControl cleanupInternal(FlinkResourceContext<FlinkDeployment> ctx) { Set<FlinkSessionJob> sessionJobs = @@ -143,13 +181,39 @@ public class SessionReconciler } return DeleteControl.noFinalizerRemoval() .rescheduleAfter(ctx.getOperatorConfig().getReconcileInterval().toMillis()); - } else { - LOG.info("Stopping session cluster"); - var conf = ctx.getDeployConfig(ctx.getResource().getSpec()); - ctx.getFlinkService() - .deleteClusterDeployment( - deployment.getMetadata(), deployment.getStatus(), conf, true); - return DeleteControl.defaultDelete(); } + + // Check for non-terminated jobs if the option is enabled (Enabled by default) , after + // sessionJobs are deleted + boolean blockOnUnmanagedJobs = + ctx.getObserveConfig() + .getBoolean(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS); + if (blockOnUnmanagedJobs) { + Set<JobID> nonTerminalJobs = getNonTerminalJobs(ctx); + if (!nonTerminalJobs.isEmpty()) { + var error = + String.format( + "The session cluster has non terminated jobs %s that should be cancelled first", + nonTerminalJobs.stream() + .map(JobID::toHexString) + .collect(Collectors.toList())); + eventRecorder.triggerEvent( + deployment, + EventRecorder.Type.Warning, + EventRecorder.Reason.CleanupFailed, + EventRecorder.Component.Operator, + error, + ctx.getKubernetesClient()); + return DeleteControl.noFinalizerRemoval() + .rescheduleAfter(ctx.getOperatorConfig().getReconcileInterval().toMillis()); + } + } + + LOG.info("Stopping session cluster"); + var conf = ctx.getObserveConfig(); + ctx.getFlinkService() + .deleteClusterDeployment( + deployment.getMetadata(), deployment.getStatus(), conf, true); + return DeleteControl.defaultDelete(); } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java index 54484065..9f84f2c1 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java @@ -17,17 +17,23 @@ package org.apache.flink.kubernetes.operator.reconciler.deployment; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.operator.OperatorTestBase; import org.apache.flink.kubernetes.operator.TestUtils; import org.apache.flink.kubernetes.operator.TestingFlinkService; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; +import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus; import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus; import org.apache.flink.kubernetes.operator.api.status.ReconciliationState; +import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; import org.apache.flink.kubernetes.operator.reconciler.TestReconcilerAdapter; +import org.apache.flink.runtime.client.JobStatusMessage; import io.fabric8.kubernetes.api.model.ObjectMeta; import io.fabric8.kubernetes.client.KubernetesClient; @@ -36,6 +42,7 @@ import lombok.Getter; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -43,6 +50,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -139,4 +147,161 @@ public class SessionReconcilerTest extends OperatorTestBase { deployConfig.get(KubernetesConfigOptions.JOB_MANAGER_OWNER_REFERENCE); Assertions.assertEquals(expectedOwnerReferences, or); } + + @Test + public void testGetNonTerminalJobs() throws Exception { + FlinkDeployment deployment = TestUtils.buildSessionCluster(); + deployment + .getSpec() + .getFlinkConfiguration() + .put(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS.key(), "true"); + + assertEquals( + "true", + deployment + .getSpec() + .getFlinkConfiguration() + .get(KubernetesOperatorConfigOptions.BLOCK_ON_UNMANAGED_JOBS.key())); + + reconciler.reconcile(deployment, flinkService.getContext()); + + // Verify deployment is in DEPLOYED state + assertEquals( + ReconciliationState.DEPLOYED, + deployment.getStatus().getReconciliationStatus().getState()); + + // Create different types of jobs + JobID managedJobId1 = new JobID(); + JobID managedJobId2 = new JobID(); + JobID unmanagedRunningJobId1 = new JobID(); + JobID unmanagedTerminatedJobId = new JobID(); + JobID unmanagedRunningJobId2 = new JobID(); + + // Add jobs to the testing service + flinkService + .listJobs() + .add( + Tuple3.of( + null, + new JobStatusMessage( + managedJobId1, + "managed-job-1", + JobStatus.RUNNING, + System.currentTimeMillis()), + new Configuration())); + flinkService + .listJobs() + .add( + Tuple3.of( + null, + new JobStatusMessage( + managedJobId2, + "managed-job-2", + JobStatus.RUNNING, + System.currentTimeMillis()), + new Configuration())); + flinkService + .listJobs() + .add( + Tuple3.of( + null, + new JobStatusMessage( + unmanagedRunningJobId1, + "unmanaged-running-job-1", + JobStatus.RUNNING, + System.currentTimeMillis()), + new Configuration())); + flinkService + .listJobs() + .add( + Tuple3.of( + null, + new JobStatusMessage( + unmanagedTerminatedJobId, + "unmanaged-terminated-job", + JobStatus.CANCELED, + System.currentTimeMillis()), + new Configuration())); + flinkService + .listJobs() + .add( + Tuple3.of( + null, + new JobStatusMessage( + unmanagedRunningJobId2, + "unmanaged-running-job-2", + JobStatus.RUNNING, + System.currentTimeMillis()), + new Configuration())); + + // Create FlinkSessionJob resources for the managed jobs + FlinkSessionJob managedSessionJob1 = TestUtils.buildSessionJob(); + managedSessionJob1.getMetadata().setName("managed-session-job-1"); + managedSessionJob1.getStatus().getJobStatus().setJobId(managedJobId1.toHexString()); + kubernetesClient.resource(managedSessionJob1).createOrReplace(); + + FlinkSessionJob managedSessionJob2 = TestUtils.buildSessionJob(); + managedSessionJob2.getMetadata().setName("managed-session-job-2"); + managedSessionJob2.getStatus().getJobStatus().setJobId(managedJobId2.toHexString()); + kubernetesClient.resource(managedSessionJob2).createOrReplace(); + + Set<FlinkSessionJob> sessionJobs = new HashSet<>(); + sessionJobs.add(managedSessionJob1); + sessionJobs.add(managedSessionJob2); + + // Test with blocking enabled - should identify all non-terminal jobs + var context = TestUtils.createContextWithReadyFlinkDeployment(kubernetesClient); + var resourceContext = getResourceContext(deployment, context); + + var sessionReconciler = (SessionReconciler) reconciler.getReconciler(); + Set<JobID> nonTerminalJobs = sessionReconciler.getNonTerminalJobs(resourceContext); + + // Verify all non-terminal jobs are identified - should be 4 (2 managed + 2 unmanaged + // running) + assertEquals(4, nonTerminalJobs.size(), "Should identify exactly 4 non-terminal jobs"); + + assertTrue( + nonTerminalJobs.contains(unmanagedRunningJobId1), + "Should contain unmanagedRunningJobId1"); + assertTrue( + nonTerminalJobs.contains(unmanagedRunningJobId2), + "Should contain unmanagedRunningJobId2"); + + // Verify terminated job is not included + assertFalse( + nonTerminalJobs.contains(unmanagedTerminatedJobId), + "Should not contain terminated job"); + + // Test scenario with only unmanaged jobs + flinkService + .listJobs() + .removeIf( + job -> + job.f1.getJobId().equals(managedJobId1) + || job.f1.getJobId().equals(managedJobId2)); + + Set<JobID> nonTerminalJobsAfterSessionJobsRemoval = + sessionReconciler.getNonTerminalJobs(resourceContext); + + assertEquals( + 2, + nonTerminalJobsAfterSessionJobsRemoval.size(), + "Should have 2 non-terminal jobs when sessionjobs are deleted"); + + // Test scenario with no running jobs + flinkService + .listJobs() + .removeIf( + job -> + job.f1.getJobId().equals(unmanagedRunningJobId1) + || job.f1.getJobId().equals(unmanagedRunningJobId2)); + + Set<JobID> nonTerminalJobsAfterRemoval = + sessionReconciler.getNonTerminalJobs(resourceContext); + + assertEquals( + 0, + nonTerminalJobsAfterRemoval.size(), + "Should have no non-terminal jobs when only terminated jobs exist"); + } }