This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push:
new a8fd1942 [FLINK-34438] Wait for Deployments to be deleted on cluster
shutdown
a8fd1942 is described below
commit a8fd19429e93428e8a2498c32def24aa8ebbd4c4
Author: Máté Czagány <[email protected]>
AuthorDate: Tue Feb 20 12:17:54 2024 +0100
[FLINK-34438] Wait for Deployments to be deleted on cluster shutdown
---
.../kubernetes_operator_config_configuration.html | 2 +-
.../shortcodes/generated/system_section.html | 2 +-
.../config/KubernetesOperatorConfigOptions.java | 2 +-
.../operator/service/AbstractFlinkService.java | 85 ++++++++++------------
.../operator/service/NativeFlinkService.java | 6 ++
.../operator/service/StandaloneFlinkService.java | 8 ++
.../kubernetes/operator/TestingFlinkService.java | 5 ++
.../operator/service/AbstractFlinkServiceTest.java | 67 +++++++++++++++++
.../service/StandaloneFlinkServiceTest.java | 14 +---
9 files changed, 132 insertions(+), 59 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index 00beffd0..d0680627 100644
---
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -310,7 +310,7 @@
</tr>
<tr>
<td><h5>kubernetes.operator.resource.cleanup.timeout</h5></td>
- <td style="word-wrap: break-word;">1 min</td>
+ <td style="word-wrap: break-word;">5 min</td>
<td>Duration</td>
<td>The timeout for the resource clean up to wait for flink to
shutdown cluster.</td>
</tr>
diff --git a/docs/layouts/shortcodes/generated/system_section.html
b/docs/layouts/shortcodes/generated/system_section.html
index 9a0350e6..aa053c2f 100644
--- a/docs/layouts/shortcodes/generated/system_section.html
+++ b/docs/layouts/shortcodes/generated/system_section.html
@@ -106,7 +106,7 @@
</tr>
<tr>
<td><h5>kubernetes.operator.resource.cleanup.timeout</h5></td>
- <td style="word-wrap: break-word;">1 min</td>
+ <td style="word-wrap: break-word;">5 min</td>
<td>Duration</td>
<td>The timeout for the resource clean up to wait for flink to
shutdown cluster.</td>
</tr>
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index 62e3ba3e..1334ba4f 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -127,7 +127,7 @@ public class KubernetesOperatorConfigOptions {
public static final ConfigOption<Duration>
OPERATOR_RESOURCE_CLEANUP_TIMEOUT =
operatorConfig("resource.cleanup.timeout")
.durationType()
- .defaultValue(Duration.ofSeconds(60))
+ .defaultValue(Duration.ofMinutes(5))
.withDeprecatedKeys(
operatorConfigKey("reconciler.flink.cluster.shutdown.timeout"))
.withDescription(
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index 2fafb027..101480da 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -101,8 +101,8 @@ import org.apache.flink.util.Preconditions;
import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.PodList;
-import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.KubernetesClientTimeoutException;
import org.apache.commons.lang3.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -125,6 +125,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -898,58 +899,50 @@ public abstract class AbstractFlinkService implements
FlinkService {
}
}
- /** Wait until the FLink cluster has completely shut down. */
- @VisibleForTesting
- void waitForClusterShutdown(String namespace, String clusterId, long
shutdownTimeout) {
- LOG.info("Waiting for cluster shutdown...");
-
- boolean jobManagerRunning = true;
- boolean taskManagerRunning = true;
- boolean serviceRunning = true;
+ /** Returns a list of Kubernetes Deployment names for given cluster. */
+ protected abstract List<String> getDeploymentNames(String namespace,
String clusterId);
- for (int i = 0; i < shutdownTimeout; i++) {
- if (jobManagerRunning) {
- PodList jmPodList = getJmPodList(namespace, clusterId);
+ /** Wait until the FLink cluster has completely shut down. */
+ protected void waitForClusterShutdown(
+ String namespace, String clusterId, long shutdownTimeout) {
+ long timeoutAt = System.currentTimeMillis() + shutdownTimeout * 1000;
+ LOG.info("Waiting {} seconds for cluster shutdown...",
shutdownTimeout);
- if (jmPodList == null || jmPodList.getItems().isEmpty()) {
- jobManagerRunning = false;
- }
- }
- if (taskManagerRunning) {
- PodList tmPodList = getTmPodList(namespace, clusterId);
+ for (var deploymentName : getDeploymentNames(namespace, clusterId)) {
+ long deploymentTimeout = timeoutAt - System.currentTimeMillis();
- if (tmPodList.getItems().isEmpty()) {
- taskManagerRunning = false;
- }
+ if (!waitForDeploymentToBeRemoved(namespace, deploymentName,
deploymentTimeout)) {
+ LOG.error(
+ "Failed to shut down cluster {} (deployment {}) in {}
seconds, proceeding...",
+ clusterId,
+ deploymentName,
+ shutdownTimeout);
+ return;
}
+ }
+ }
- if (serviceRunning) {
- Service service =
- kubernetesClient
- .services()
- .inNamespace(namespace)
- .withName(
-
ExternalServiceDecorator.getExternalServiceName(clusterId))
- .get();
- if (service == null) {
- serviceRunning = false;
- }
- }
+ /** Wait until Deployment is removed, return false if timed out, otherwise
return true. */
+ @VisibleForTesting
+ boolean waitForDeploymentToBeRemoved(String namespace, String
deploymentName, long timeout) {
+ LOG.info(
+ "Waiting for Deployment {} to shut down with {} seconds
timeout...",
+ deploymentName,
+ timeout / 1000);
- if (!jobManagerRunning && !serviceRunning && !taskManagerRunning) {
- break;
- }
- // log a message waiting to shutdown Flink cluster every 5 seconds.
- if ((i + 1) % 5 == 0) {
- LOG.info("Waiting for cluster shutdown... ({}s)", i + 1);
- }
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
+ try {
+ kubernetesClient
+ .apps()
+ .deployments()
+ .inNamespace(namespace)
+ .withName(deploymentName)
+ .waitUntilCondition(Objects::isNull, timeout,
TimeUnit.MILLISECONDS);
+
+ LOG.info("Deployment {} successfully shut down", deploymentName);
+ } catch (KubernetesClientTimeoutException e) {
+ return false;
}
- LOG.info("Cluster shutdown completed.");
+ return true;
}
private static List<JobStatusMessage> toJobStatusMessage(
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
index 52bb398e..fe5cde06 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
@@ -62,6 +62,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -132,6 +133,11 @@ public class NativeFlinkService extends
AbstractFlinkService {
return new PodList();
}
+ @Override
+ protected List<String> getDeploymentNames(String namespace, String
clusterId) {
+ return List.of(KubernetesUtils.getDeploymentName(clusterId));
+ }
+
protected void submitClusterInternal(Configuration conf) throws Exception {
LOG.info("Deploying session cluster");
final ClusterClientServiceLoader clusterClientServiceLoader =
diff --git
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
index c0439d04..0c3e7477 100644
---
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
+++
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
@@ -47,6 +47,7 @@ import io.fabric8.kubernetes.client.KubernetesClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -104,6 +105,13 @@ public class StandaloneFlinkService extends
AbstractFlinkService {
.list();
}
+ @Override
+ protected List<String> getDeploymentNames(String namespace, String
clusterId) {
+ return List.of(
+
StandaloneKubernetesUtils.getJobManagerDeploymentName(clusterId),
+
StandaloneKubernetesUtils.getTaskManagerDeploymentName(clusterId));
+ }
+
@VisibleForTesting
protected FlinkStandaloneKubeClient
createNamespacedKubeClient(Configuration configuration) {
final int poolSize =
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 14fa2a9d..d2ad412a 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -628,4 +628,9 @@ public class TestingFlinkService extends
AbstractFlinkService {
public JobDetailsInfo getJobDetailsInfo(JobID jobID, Configuration conf) {
return NativeFlinkServiceTest.createJobDetailsFor(List.of());
}
+
+ @Override
+ protected List<String> getDeploymentNames(String namespace, String
clusterId) {
+ return List.of(clusterId + "-deployment");
+ }
}
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
index 637e3bed..a2b09647 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
@@ -95,10 +95,17 @@ import org.apache.flink.util.function.TriFunction;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import io.fabric8.kubernetes.api.model.DeletionPropagation;
+import io.fabric8.kubernetes.api.model.ListMeta;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.PodList;
+import io.fabric8.kubernetes.api.model.WatchEvent;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
+import io.fabric8.kubernetes.api.model.apps.DeploymentList;
+import io.fabric8.kubernetes.api.model.apps.DeploymentListBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
+import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -108,6 +115,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.io.IOException;
+import java.net.HttpURLConnection;
import java.net.ServerSocket;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -144,6 +152,7 @@ public class AbstractFlinkServiceTest {
File testJar;
private KubernetesClient client;
+ private KubernetesMockServer mockServer;
private final Configuration configuration = new Configuration();
private final FlinkConfigManager configManager = new
FlinkConfigManager(configuration);
@@ -1046,6 +1055,59 @@ public class AbstractFlinkServiceTest {
});
}
+ @Test
+ public void testWaitForClusterShutdown() {
+ String deploymentName = "test-cluster";
+ String namespace = "test-namespace";
+ String getUrl =
+ String.format(
+
"/apis/apps/v1/namespaces/%s/deployments?fieldSelector=metadata.name%%3D%s",
+ namespace, deploymentName);
+ String watchUrl =
+ String.format(
+
"/apis/apps/v1/namespaces/%s/deployments?fieldSelector=metadata.name%%3D%s&timeoutSeconds=600&allowWatchBookmarks=true&watch=true",
+ namespace, deploymentName);
+
+ var flinkService = new TestingService(null);
+
+ Deployment deployment =
+ new DeploymentBuilder()
+ .withNewMetadata()
+ .withName(deploymentName)
+ .withNamespace(namespace)
+ .endMetadata()
+ .build();
+
+ DeploymentList deploymentList =
+ new DeploymentListBuilder()
+ .withMetadata(new ListMeta())
+ .withItems(deployment)
+ .build();
+
+ mockServer
+ .expect()
+ .get()
+ .withPath(getUrl)
+ .andReturn(HttpURLConnection.HTTP_OK, deploymentList)
+ .once();
+ mockServer
+ .expect()
+ .get()
+ .withPath(watchUrl)
+ .andUpgradeToWebSocket()
+ .open()
+ .waitFor(10)
+ .andEmit(new WatchEvent(deployment, "DELETED"))
+ .done()
+ .always();
+
+ boolean result =
+ flinkService.waitForDeploymentToBeRemoved(namespace,
deploymentName, 10000);
+
+ assertTrue(result);
+ assertEquals(2, mockServer.getRequestCount());
+ }
+
class TestingService extends AbstractFlinkService {
RestClusterClient<String> clusterClient;
@@ -1089,6 +1151,11 @@ public class AbstractFlinkServiceTest {
return tmPods.getOrDefault(Tuple2.of(namespace, clusterId), new
PodList());
}
+ @Override
+ protected List<String> getDeploymentNames(String namespace, String
clusterId) {
+ return List.of(clusterId);
+ }
+
@Override
protected void deployApplicationCluster(JobSpec jobSpec, Configuration
conf) {
throw new UnsupportedOperationException();
diff --git
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
index f583df92..7c1e2175 100644
---
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
+++
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
@@ -33,8 +33,6 @@ import
org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
import org.apache.flink.util.concurrent.Executors;
import io.fabric8.kubernetes.api.model.ObjectMeta;
-import io.fabric8.kubernetes.api.model.Pod;
-import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
@@ -121,7 +119,8 @@ public class StandaloneFlinkServiceTest {
new Configuration(),
true);
- assertEquals(2, service.nbCall);
+ // How many times were getDeploymentNames() called
+ assertEquals(1, service.nbCall);
deployments = kubernetesClient.apps().deployments().list().getItems();
@@ -290,14 +289,9 @@ public class StandaloneFlinkServiceTest {
}
@Override
- protected PodList getTmPodList(String namespace, String clusterId) {
+ protected List<String> getDeploymentNames(String namespace, String
clusterId) {
nbCall++;
- PodList podList = new PodList();
- if (nbCall == 1) {
- Pod pod = new Pod();
- podList.setItems(List.of(pod));
- }
- return podList;
+ return List.of(clusterId);
}
}