This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new 6e597d4c [FLINK-29609] Shut down JM for terminated applications after
configured duration
6e597d4c is described below
commit 6e597d4c0a144a01840b40e6b669e558970c55d0
Author: Gyula Fora <[email protected]>
AuthorDate: Fri Nov 11 14:41:11 2022 +0100
[FLINK-29609] Shut down JM for terminated applications after configured
duration
---
.../shortcodes/generated/dynamic_section.html | 6 ++++
.../kubernetes_operator_config_configuration.html | 6 ++++
.../config/KubernetesOperatorConfigOptions.java | 8 +++++
.../AbstractFlinkResourceReconciler.java | 11 +++++-
.../deployment/ApplicationReconciler.java | 28 ++++++++++++++-
.../operator/service/AbstractFlinkService.java | 26 ++++++++++++++
.../operator/service/NativeFlinkService.java | 42 +++++-----------------
.../operator/service/StandaloneFlinkService.java | 10 ++----
.../kubernetes/operator/TestingFlinkService.java | 7 +---
.../deployment/ApplicationReconcilerTest.java | 38 ++++++++++++++++++++
10 files changed, 133 insertions(+), 49 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/dynamic_section.html
b/docs/layouts/shortcodes/generated/dynamic_section.html
index 6bfb39cc..51e1b313 100644
--- a/docs/layouts/shortcodes/generated/dynamic_section.html
+++ b/docs/layouts/shortcodes/generated/dynamic_section.html
@@ -44,6 +44,12 @@
<td>Boolean</td>
<td>Whether to enable recovery of missing/deleted jobmanager
deployments.</td>
</tr>
+ <tr>
+ <td><h5>kubernetes.operator.jm-deployment.shutdown-ttl</h5></td>
+ <td style="word-wrap: break-word;">86400000 ms</td>
+ <td>Duration</td>
+ <td>Time after which jobmanager pods of terminal application
deployments are shut down.</td>
+ </tr>
<tr>
<td><h5>kubernetes.operator.job.restart.failed</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index b243d659..5e69a439 100644
---
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -122,6 +122,12 @@
<td>Boolean</td>
<td>Whether to enable recovery of missing/deleted jobmanager
deployments.</td>
</tr>
+ <tr>
+ <td><h5>kubernetes.operator.jm-deployment.shutdown-ttl</h5></td>
+ <td style="word-wrap: break-word;">86400000 ms</td>
+ <td>Duration</td>
+ <td>Time after which jobmanager pods of terminal application
deployments are shut down.</td>
+ </tr>
<tr>
<td><h5>kubernetes.operator.job.restart.failed</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index 508de68e..ea769568 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -422,4 +422,12 @@ public class KubernetesOperatorConfigOptions {
.durationType()
.defaultValue(LeaderElectionConfiguration.RETRY_PERIOD_DEFAULT_VALUE)
.withDescription("Leader election retry period.");
+
+ @Documentation.Section(SECTION_DYNAMIC)
+ public static final ConfigOption<Duration> OPERATOR_JM_SHUTDOWN_TTL =
+ operatorConfig("jm-deployment.shutdown-ttl")
+ .durationType()
+ .defaultValue(Duration.ofDays(1))
+ .withDescription(
+ "Time after which jobmanager pods of terminal
application deployments are shut down.");
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index ee4d2304..d1c2c743 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -17,6 +17,7 @@
package org.apache.flink.kubernetes.operator.reconciler.deployment;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
@@ -47,6 +48,7 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
@@ -77,6 +79,8 @@ public abstract class AbstractFlinkResourceReconciler<
public static final String MSG_ROLLBACK = "Rolling back failed
deployment.";
public static final String MSG_SUBMIT = "Starting deployment";
+ protected Clock clock = Clock.systemDefaultZone();
+
public AbstractFlinkResourceReconciler(
KubernetesClient kubernetesClient,
FlinkConfigManager configManager,
@@ -358,7 +362,7 @@ public abstract class AbstractFlinkResourceReconciler<
Duration readinessTimeout =
configuration.get(KubernetesOperatorConfigOptions.DEPLOYMENT_READINESS_TIMEOUT);
- if (!Instant.now()
+ if (!clock.instant()
.minus(readinessTimeout)
.isAfter(Instant.ofEpochMilli(reconciliationStatus.getReconciliationTimestamp())))
{
return false;
@@ -447,4 +451,9 @@ public abstract class AbstractFlinkResourceReconciler<
deployConfig.set(
KubernetesConfigOptions.JOB_MANAGER_OWNER_REFERENCE,
List.of(ownerReference));
}
+
+ @VisibleForTesting
+ protected void setClock(Clock clock) {
+ this.clock = clock;
+ }
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index ae821b96..2c39731c 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -50,6 +50,7 @@ import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
@@ -281,7 +282,7 @@ public class ApplicationReconciler
return true;
}
- return false;
+ return cleanupTerminalJmAfterTtl(deployment, observeConfig);
}
private boolean shouldRestartJobBecauseUnhealthy(
@@ -318,6 +319,31 @@ public class ApplicationReconciler
return restartNeeded;
}
+ private boolean cleanupTerminalJmAfterTtl(
+ FlinkDeployment deployment, Configuration observeConfig) {
+ var status = deployment.getStatus();
+ boolean terminal = ReconciliationUtils.isJobInTerminalState(status);
+ boolean jmStillRunning =
+ status.getJobManagerDeploymentStatus() !=
JobManagerDeploymentStatus.MISSING;
+
+ if (terminal && jmStillRunning) {
+ var ttl =
observeConfig.get(KubernetesOperatorConfigOptions.OPERATOR_JM_SHUTDOWN_TTL);
+ boolean ttlPassed =
+ clock.instant()
+ .isAfter(
+ Instant.ofEpochMilli(
+ Long.parseLong(
+
status.getJobStatus().getUpdateTime()))
+ .plus(ttl));
+ if (ttlPassed) {
+ LOG.info("Removing JobManager deployment for terminal
application.");
+ flinkService.deleteClusterDeployment(deployment.getMetadata(),
status, false);
+ return true;
+ }
+ }
+ return false;
+ }
+
@Override
@SneakyThrows
protected DeleteControl cleanupInternal(FlinkDeployment deployment,
Context<?> context) {
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 2208cf80..e68168e8 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -34,6 +34,7 @@ import
org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
+import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
@@ -884,4 +885,29 @@ public abstract class AbstractFlinkService implements
FlinkService {
.collect(Collectors.toMap((t) -> t.f0, (t) -> t.f1));
}
}
+
+ @Override
+ public final void deleteClusterDeployment(
+ ObjectMeta meta, FlinkDeploymentStatus status, boolean
deleteHaData) {
+ deleteClusterInternal(meta, deleteHaData);
+ updateStatusAfterClusterDeletion(status);
+ }
+
+ /**
+ * Delete Flink kubernetes cluster by deleting the kubernetes resources
directly. Optionally
+ * allows deleting the native kubernetes HA resources as well.
+ *
+ * @param meta ObjectMeta of the deployment
+ * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata
should be removed as well
+ */
+ protected abstract void deleteClusterInternal(ObjectMeta meta, boolean
deleteHaConfigmaps);
+
+ protected void updateStatusAfterClusterDeletion(FlinkDeploymentStatus
status) {
+
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
+ var currentJobState = status.getJobStatus().getState();
+ if (currentJobState == null
+ ||
!JobStatus.valueOf(currentJobState).isGloballyTerminalState()) {
+ status.getJobStatus().setState(JobStatus.FINISHED.name());
+ }
+ }
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
index 1402c8d3..955185cd 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
@@ -17,7 +17,6 @@
package org.apache.flink.kubernetes.operator.service;
-import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.cli.ApplicationDeployer;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterClientServiceLoader;
@@ -30,8 +29,6 @@ import
org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
-import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
-import
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.utils.KubernetesUtils;
@@ -88,19 +85,6 @@ public class NativeFlinkService extends AbstractFlinkService
{
cancelJob(deployment, upgradeMode, configuration,
deleteClusterAfterSavepoint);
}
- @Override
- public void deleteClusterDeployment(
- ObjectMeta meta, FlinkDeploymentStatus status, boolean
deleteHaData) {
- deleteCluster(
- status,
- meta,
- deleteHaData,
- configManager
- .getOperatorConfiguration()
- .getFlinkShutdownClusterTimeout()
- .toSeconds());
- }
-
@Override
protected PodList getJmPodList(String namespace, String clusterId) {
return kubernetesClient
@@ -124,20 +108,8 @@ public class NativeFlinkService extends
AbstractFlinkService {
LOG.info("Session cluster successfully deployed");
}
- /**
- * Delete Flink kubernetes cluster by deleting the kubernetes resources
directly. Optionally
- * allows deleting the native kubernetes HA resources as well.
- *
- * @param status Deployment status object
- * @param meta ObjectMeta of the deployment
- * @param deleteHaConfigmaps Flag to indicate whether k8s HA metadata
should be removed as well
- * @param shutdownTimeout maximum time allowed for cluster shutdown
- */
- private void deleteCluster(
- FlinkDeploymentStatus status,
- ObjectMeta meta,
- boolean deleteHaConfigmaps,
- long shutdownTimeout) {
+ @Override
+ protected void deleteClusterInternal(ObjectMeta meta, boolean
deleteHaConfigmaps) {
String namespace = meta.getNamespace();
String clusterId = meta.getName();
@@ -154,7 +126,13 @@ public class NativeFlinkService extends
AbstractFlinkService {
if (deleteHaConfigmaps) {
// We need to wait for cluster shutdown otherwise HA configmaps
might be recreated
- waitForClusterShutdown(namespace, clusterId, shutdownTimeout);
+ waitForClusterShutdown(
+ namespace,
+ clusterId,
+ configManager
+ .getOperatorConfiguration()
+ .getFlinkShutdownClusterTimeout()
+ .toSeconds());
kubernetesClient
.configMaps()
.inNamespace(namespace)
@@ -163,7 +141,5 @@ public class NativeFlinkService extends
AbstractFlinkService {
clusterId,
LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY))
.delete();
}
-
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
- status.getJobStatus().setState(JobStatus.FINISHED.name());
}
}
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
index 9fb7f3af..a9a5c21f 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
@@ -28,7 +28,6 @@ import
org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
-import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.Mode;
import
org.apache.flink.kubernetes.operator.kubeclient.Fabric8FlinkStandaloneKubeClient;
@@ -83,12 +82,6 @@ public class StandaloneFlinkService extends
AbstractFlinkService {
cancelJob(deployment, upgradeMode, conf, true);
}
- @Override
- public void deleteClusterDeployment(
- ObjectMeta meta, FlinkDeploymentStatus status, boolean
deleteHaData) {
- deleteClusterInternal(meta, deleteHaData);
- }
-
@Override
protected PodList getJmPodList(String namespace, String clusterId) {
return kubernetesClient
@@ -136,7 +129,8 @@ public class StandaloneFlinkService extends
AbstractFlinkService {
return new
KubernetesClusterClientFactory().getClusterSpecification(conf);
}
- private void deleteClusterInternal(ObjectMeta meta, boolean
deleteHaConfigmaps) {
+ @Override
+ protected void deleteClusterInternal(ObjectMeta meta, boolean
deleteHaConfigmaps) {
final String clusterId = meta.getName();
final String namespace = meta.getNamespace();
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 04fff8ee..5d098806 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -32,8 +32,6 @@ import
org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
-import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
-import
org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
import org.apache.flink.kubernetes.operator.api.status.SavepointInfo;
@@ -393,12 +391,9 @@ public class TestingFlinkService extends
AbstractFlinkService {
}
@Override
- public void deleteClusterDeployment(
- ObjectMeta meta, FlinkDeploymentStatus status, boolean
deleteHaMeta) {
+ protected void deleteClusterInternal(ObjectMeta meta, boolean
deleteHaMeta) {
jobs.clear();
sessions.remove(meta.getName());
-
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
- status.getJobStatus().setState(JobStatus.FINISHED.name());
}
@Override
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index c2535c3b..b2b2bb11 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -63,6 +63,10 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.platform.commons.util.StringUtils;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -627,4 +631,38 @@ public class ApplicationReconcilerTest {
deployConfig.get(KubernetesConfigOptions.JOB_MANAGER_OWNER_REFERENCE);
Assertions.assertEquals(expectedOwnerReferences, or);
}
+
+ @Test
+ public void testTerminalJmTtl() throws Exception {
+ FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+ deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
+ reconciler.reconcile(deployment, context);
+ verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+
+ deployment.getSpec().getJob().setState(JobState.SUSPENDED);
+ reconciler.reconcile(deployment, context);
+ var status = deployment.getStatus();
+ assertEquals(
+ org.apache.flink.api.common.JobStatus.FINISHED.toString(),
+ status.getJobStatus().getState());
+ assertEquals(JobManagerDeploymentStatus.READY,
status.getJobManagerDeploymentStatus());
+
+ deployment
+ .getSpec()
+ .getFlinkConfiguration()
+ .put(
+
KubernetesOperatorConfigOptions.OPERATOR_JM_SHUTDOWN_TTL.key(),
+ String.valueOf(Duration.ofMinutes(5).toMillis()));
+
+ var now = Instant.now();
+
status.getJobStatus().setUpdateTime(String.valueOf(now.toEpochMilli()));
+
+ reconciler.setClock(Clock.fixed(now.plus(Duration.ofMinutes(3)),
ZoneId.systemDefault()));
+ reconciler.reconcile(deployment, context);
+ assertEquals(JobManagerDeploymentStatus.READY,
status.getJobManagerDeploymentStatus());
+
+ reconciler.setClock(Clock.fixed(now.plus(Duration.ofMinutes(6)),
ZoneId.systemDefault()));
+ reconciler.reconcile(deployment, context);
+ assertEquals(JobManagerDeploymentStatus.MISSING,
status.getJobManagerDeploymentStatus());
+ }
}