This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 6e597d4c [FLINK-29609] Shut down JM for terminated applications after 
configured duration
6e597d4c is described below

commit 6e597d4c0a144a01840b40e6b669e558970c55d0
Author: Gyula Fora <[email protected]>
AuthorDate: Fri Nov 11 14:41:11 2022 +0100

    [FLINK-29609] Shut down JM for terminated applications after configured 
duration
---
 .../shortcodes/generated/dynamic_section.html      |  6 ++++
 .../kubernetes_operator_config_configuration.html  |  6 ++++
 .../config/KubernetesOperatorConfigOptions.java    |  8 +++++
 .../AbstractFlinkResourceReconciler.java           | 11 +++++-
 .../deployment/ApplicationReconciler.java          | 28 ++++++++++++++-
 .../operator/service/AbstractFlinkService.java     | 26 ++++++++++++++
 .../operator/service/NativeFlinkService.java       | 42 +++++-----------------
 .../operator/service/StandaloneFlinkService.java   | 10 ++----
 .../kubernetes/operator/TestingFlinkService.java   |  7 +---
 .../deployment/ApplicationReconcilerTest.java      | 38 ++++++++++++++++++++
 10 files changed, 133 insertions(+), 49 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/dynamic_section.html 
b/docs/layouts/shortcodes/generated/dynamic_section.html
index 6bfb39cc..51e1b313 100644
--- a/docs/layouts/shortcodes/generated/dynamic_section.html
+++ b/docs/layouts/shortcodes/generated/dynamic_section.html
@@ -44,6 +44,12 @@
             <td>Boolean</td>
             <td>Whether to enable recovery of missing/deleted jobmanager 
deployments.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.jm-deployment.shutdown-ttl</h5></td>
+            <td style="word-wrap: break-word;">86400000 ms</td>
+            <td>Duration</td>
+            <td>Time after which jobmanager pods of terminal application 
deployments are shut down.</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.job.restart.failed</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index b243d659..5e69a439 100644
--- 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -122,6 +122,12 @@
             <td>Boolean</td>
             <td>Whether to enable recovery of missing/deleted jobmanager 
deployments.</td>
         </tr>
+        <tr>
+            <td><h5>kubernetes.operator.jm-deployment.shutdown-ttl</h5></td>
+            <td style="word-wrap: break-word;">86400000 ms</td>
+            <td>Duration</td>
+            <td>Time after which jobmanager pods of terminal application 
deployments are shut down.</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.job.restart.failed</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index 508de68e..ea769568 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -422,4 +422,12 @@ public class KubernetesOperatorConfigOptions {
                     .durationType()
                     
.defaultValue(LeaderElectionConfiguration.RETRY_PERIOD_DEFAULT_VALUE)
                     .withDescription("Leader election retry period.");
+
+    @Documentation.Section(SECTION_DYNAMIC)
+    public static final ConfigOption<Duration> OPERATOR_JM_SHUTDOWN_TTL =
+            operatorConfig("jm-deployment.shutdown-ttl")
+                    .durationType()
+                    .defaultValue(Duration.ofDays(1))
+                    .withDescription(
+                            "Time after which jobmanager pods of terminal 
application deployments are shut down.");
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index ee4d2304..d1c2c743 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.reconciler.deployment;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
@@ -47,6 +48,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Clock;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.List;
@@ -77,6 +79,8 @@ public abstract class AbstractFlinkResourceReconciler<
     public static final String MSG_ROLLBACK = "Rolling back failed 
deployment.";
     public static final String MSG_SUBMIT = "Starting deployment";
 
+    protected Clock clock = Clock.systemDefaultZone();
+
     public AbstractFlinkResourceReconciler(
             KubernetesClient kubernetesClient,
             FlinkConfigManager configManager,
@@ -358,7 +362,7 @@ public abstract class AbstractFlinkResourceReconciler<
 
         Duration readinessTimeout =
                 
configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_READINESS_TIMEOUT);
-        if (!Instant.now()
+        if (!clock.instant()
                 .minus(readinessTimeout)
                 
.isAfter(Instant.ofEpochMilli(reconciliationStatus.getReconciliationTimestamp())))
 {
             return false;
@@ -447,4 +451,9 @@ public abstract class AbstractFlinkResourceReconciler<
         deployConfig.set(
                 KubernetesConfigOptions.JOB_MANAGER_OWNER_REFERENCE, 
List.of(ownerReference));
     }
+
+    @VisibleForTesting
+    protected void setClock(Clock clock) {
+        this.clock = clock;
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index ae821b96..2c39731c 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -50,6 +50,7 @@ import lombok.SneakyThrows;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.time.Instant;
 import java.util.Optional;
 import java.util.UUID;
 
@@ -281,7 +282,7 @@ public class ApplicationReconciler
             return true;
         }
 
-        return false;
+        return cleanupTerminalJmAfterTtl(deployment, observeConfig);
     }
 
     private boolean shouldRestartJobBecauseUnhealthy(
@@ -318,6 +319,31 @@ public class ApplicationReconciler
         return restartNeeded;
     }
 
+    private boolean cleanupTerminalJmAfterTtl(
+            FlinkDeployment deployment, Configuration observeConfig) {
+        var status = deployment.getStatus();
+        boolean terminal = ReconciliationUtils.isJobInTerminalState(status);
+        boolean jmStillRunning =
+                status.getJobManagerDeploymentStatus() != 
JobManagerDeploymentStatus.MISSING;
+
+        if (terminal && jmStillRunning) {
+            var ttl = 
observeConfig.get(KubernetesOperatorConfigOptions.OPERATOR_JM_SHUTDOWN_TTL);
+            boolean ttlPassed =
+                    clock.instant()
+                            .isAfter(
+                                    Instant.ofEpochMilli(
+                                                    Long.parseLong(
+                                                            
status.getJobStatus().getUpdateTime()))
+                                            .plus(ttl));
+            if (ttlPassed) {
+                LOG.info("Removing JobManager deployment for terminal 
application.");
+                flinkService.deleteClusterDeployment(deployment.getMetadata(), 
status, false);
+                return true;
+            }
+        }
+        return false;
+    }
+
     @Override
     @SneakyThrows
     protected DeleteControl cleanupInternal(FlinkDeployment deployment, 
Context<?> context) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 2208cf80..e68168e8 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
 import 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.api.status.Savepoint;
 import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
@@ -884,4 +885,29 @@ public abstract class AbstractFlinkService implements 
FlinkService {
                     .collect(Collectors.toMap((t) -> t.f0, (t) -> t.f1));
         }
     }
+
+    @Override
+    public final void deleteClusterDeployment(
+            ObjectMeta meta, FlinkDeploymentStatus status, boolean 
deleteHaData) {
+        deleteClusterInternal(meta, deleteHaData);
+        updateStatusAfterClusterDeletion(status);
+    }
+
+    /**
+     * Delete Flink kubernetes cluster by deleting the kubernetes resources 
directly. Optionally
+     * allows deleting the native kubernetes HA resources as well.
+     *
+     * @param meta ObjectMeta of the deployment
+     * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata 
should be removed as well
+     */
+    protected abstract void deleteClusterInternal(ObjectMeta meta, boolean 
deleteHaConfigmaps);
+
+    protected void updateStatusAfterClusterDeletion(FlinkDeploymentStatus 
status) {
+        
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+        var currentJobState = status.getJobStatus().getState();
+        if (currentJobState == null
+                || 
!JobStatus.valueOf(currentJobState).isGloballyTerminalState()) {
+            status.getJobStatus().setState(JobStatus.FINISHED.name());
+        }
+    }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
index 1402c8d3..955185cd 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.kubernetes.operator.service;
 
-import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.client.cli.ApplicationDeployer;
 import org.apache.flink.client.deployment.ClusterClientFactory;
 import org.apache.flink.client.deployment.ClusterClientServiceLoader;
@@ -30,8 +29,6 @@ import 
org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
-import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
-import 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.utils.KubernetesUtils;
 
@@ -88,19 +85,6 @@ public class NativeFlinkService extends AbstractFlinkService 
{
         cancelJob(deployment, upgradeMode, configuration, 
deleteClusterAfterSavepoint);
     }
 
-    @Override
-    public void deleteClusterDeployment(
-            ObjectMeta meta, FlinkDeploymentStatus status, boolean 
deleteHaData) {
-        deleteCluster(
-                status,
-                meta,
-                deleteHaData,
-                configManager
-                        .getOperatorConfiguration()
-                        .getFlinkShutdownClusterTimeout()
-                        .toSeconds());
-    }
-
     @Override
     protected PodList getJmPodList(String namespace, String clusterId) {
         return kubernetesClient
@@ -124,20 +108,8 @@ public class NativeFlinkService extends 
AbstractFlinkService {
         LOG.info("Session cluster successfully deployed");
     }
 
-    /**
-     * Delete Flink kubernetes cluster by deleting the kubernetes resources 
directly. Optionally
-     * allows deleting the native kubernetes HA resources as well.
-     *
-     * @param status Deployment status object
-     * @param meta ObjectMeta of the deployment
-     * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata 
should be removed as well
-     * @param shutdownTimeout maximum time allowed for cluster shutdown
-     */
-    private void deleteCluster(
-            FlinkDeploymentStatus status,
-            ObjectMeta meta,
-            boolean deleteHaConfigmaps,
-            long shutdownTimeout) {
+    @Override
+    protected void deleteClusterInternal(ObjectMeta meta, boolean 
deleteHaConfigmaps) {
 
         String namespace = meta.getNamespace();
         String clusterId = meta.getName();
@@ -154,7 +126,13 @@ public class NativeFlinkService extends 
AbstractFlinkService {
 
         if (deleteHaConfigmaps) {
             // We need to wait for cluster shutdown otherwise HA configmaps 
might be recreated
-            waitForClusterShutdown(namespace, clusterId, shutdownTimeout);
+            waitForClusterShutdown(
+                    namespace,
+                    clusterId,
+                    configManager
+                            .getOperatorConfiguration()
+                            .getFlinkShutdownClusterTimeout()
+                            .toSeconds());
             kubernetesClient
                     .configMaps()
                     .inNamespace(namespace)
@@ -163,7 +141,5 @@ public class NativeFlinkService extends 
AbstractFlinkService {
                                     clusterId, 
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
                     .delete();
         }
-        
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
-        status.getJobStatus().setState(JobStatus.FINISHED.name());
     }
 }
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
index 9fb7f3af..a9a5c21f 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
@@ -28,7 +28,6 @@ import 
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
 import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
-import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.config.Mode;
 import 
org.apache.flink.kubernetes.operator.kubeclient.Fabric8FlinkStandaloneKubeClient;
@@ -83,12 +82,6 @@ public class StandaloneFlinkService extends 
AbstractFlinkService {
         cancelJob(deployment, upgradeMode, conf, true);
     }
 
-    @Override
-    public void deleteClusterDeployment(
-            ObjectMeta meta, FlinkDeploymentStatus status, boolean 
deleteHaData) {
-        deleteClusterInternal(meta, deleteHaData);
-    }
-
     @Override
     protected PodList getJmPodList(String namespace, String clusterId) {
         return kubernetesClient
@@ -136,7 +129,8 @@ public class StandaloneFlinkService extends 
AbstractFlinkService {
         return new 
KubernetesClusterClientFactory().getClusterSpecification(conf);
     }
 
-    private void deleteClusterInternal(ObjectMeta meta, boolean 
deleteHaConfigmaps) {
+    @Override
+    protected void deleteClusterInternal(ObjectMeta meta, boolean 
deleteHaConfigmaps) {
         final String clusterId = meta.getName();
         final String namespace = meta.getNamespace();
 
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 04fff8ee..5d098806 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -32,8 +32,6 @@ import 
org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
 import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
-import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
-import 
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
 import org.apache.flink.kubernetes.operator.api.status.Savepoint;
 import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
 import org.apache.flink.kubernetes.operator.api.status.SavepointInfo;
@@ -393,12 +391,9 @@ public class TestingFlinkService extends 
AbstractFlinkService {
     }
 
     @Override
-    public void deleteClusterDeployment(
-            ObjectMeta meta, FlinkDeploymentStatus status, boolean 
deleteHaMeta) {
+    protected void deleteClusterInternal(ObjectMeta meta, boolean 
deleteHaMeta) {
         jobs.clear();
         sessions.remove(meta.getName());
-        
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
-        status.getJobStatus().setState(JobStatus.FINISHED.name());
     }
 
     @Override
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index c2535c3b..b2b2bb11 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -63,6 +63,10 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.platform.commons.util.StringUtils;
 
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -627,4 +631,38 @@ public class ApplicationReconcilerTest {
                 
deployConfig.get(KubernetesConfigOptions.JOB_MANAGER_OWNER_REFERENCE);
         Assertions.assertEquals(expectedOwnerReferences, or);
     }
+
+    @Test
+    public void testTerminalJmTtl() throws Exception {
+        FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+        deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
+        reconciler.reconcile(deployment, context);
+        verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+
+        deployment.getSpec().getJob().setState(JobState.SUSPENDED);
+        reconciler.reconcile(deployment, context);
+        var status = deployment.getStatus();
+        assertEquals(
+                org.apache.flink.api.common.JobStatus.FINISHED.toString(),
+                status.getJobStatus().getState());
+        assertEquals(JobManagerDeploymentStatus.READY, 
status.getJobManagerDeploymentStatus());
+
+        deployment
+                .getSpec()
+                .getFlinkConfiguration()
+                .put(
+                        
KubernetesOperatorConfigOptions.OPERATOR_JM_SHUTDOWN_TTL.key(),
+                        String.valueOf(Duration.ofMinutes(5).toMillis()));
+
+        var now = Instant.now();
+        
status.getJobStatus().setUpdateTime(String.valueOf(now.toEpochMilli()));
+
+        reconciler.setClock(Clock.fixed(now.plus(Duration.ofMinutes(3)), 
ZoneId.systemDefault()));
+        reconciler.reconcile(deployment, context);
+        assertEquals(JobManagerDeploymentStatus.READY, 
status.getJobManagerDeploymentStatus());
+
+        reconciler.setClock(Clock.fixed(now.plus(Duration.ofMinutes(6)), 
ZoneId.systemDefault()));
+        reconciler.reconcile(deployment, context);
+        assertEquals(JobManagerDeploymentStatus.MISSING, 
status.getJobManagerDeploymentStatus());
+    }
 }

Reply via email to