This is an automated email from the ASF dual-hosted git repository. feiwang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push: new 302b5fa1e6 [KYUUBI #7101] Load the existing pods when initializing kubernetes client to cleanup terminated app pods 302b5fa1e6 is described below commit 302b5fa1e60708cf8783893d166a22c7e6130309 Author: Wang, Fei <fwan...@ebay.com> AuthorDate: Sun Jun 22 22:35:14 2025 -0700 [KYUUBI #7101] Load the existing pods when initializing kubernetes client to cleanup terminated app pods ### Why are the changes needed? To prevent the terminated app pods leak if the events missed during kyuubi server restart. ### How was this patch tested? Manual test. ``` :2025-06-17 17:50:37.275 INFO [main] org.apache.kyuubi.engine.KubernetesApplicationOperation: [KubernetesInfo(Some(28),Some(dls-prod))] Found existing pod kyuubi-xb406fc5-7b0b-4fdf-8531-929ed2ae250d-8998-5b406fc5-7b0b-4fdf-8531-929ed2ae250d-8998-90c0b328-930f-11ed-a1eb-0242ac120002-0-20250423211008-grectg-stm-17da59fe-caf4-41e4-a12f-6c1ed9a293f9-driver with label: kyuubi-unique-tag=17da59fe-caf4-41e4-a12f-6c1ed9a293f9 in app state FINISHED, marking it as terminated 2025-06-17 17:50:37.278 INFO [main] org.apache.kyuubi.engine.KubernetesApplicationOperation: [KubernetesInfo(Some(28),Some(dls-prod))] Found existing pod kyuubi-xb406fc5-7b0b-4fdf-8531-929ed2ae250d-8998-5b406fc5-7b0b-4fdf-8531-929ed2ae250d-8998-90c0b328-930f-11ed-a1eb-0242ac120002-0-20250423212011-gpdtsi-stm-6a23000f-10be-4a42-ae62-4fa2da8fac07-driver with label: kyuubi-unique-tag=6a23000f-10be-4a42-ae62-4fa2da8fac07 in app state FINISHED, marking it as terminated ``` The pods are cleaned up eventually. <img width="664" alt="image" src="https://github.com/user-attachments/assets/8cf58f61-065f-4fb0-9718-2e3c00e8d2e0" /> ### Was this patch authored or co-authored using generative AI tooling? No. Closes #7101 from turboFei/pod_cleanup. Closes #7101 7f76cf57c [Wang, Fei] async 11c9db25d [Wang, Fei] cleanup Authored-by: Wang, Fei <fwan...@ebay.com> Signed-off-by: Wang, Fei <fwan...@ebay.com> --- .../engine/KubernetesApplicationOperation.scala | 46 +++++++++++++++++++++- 1 file changed, 45 insertions(+), 1 deletion(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala index a808030f43..7fe24e0c6b 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala @@ -75,9 +75,45 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { private var cleanupCanceledAppPodExecutor: ThreadPoolExecutor = _ + private var kubernetesClientInitializeCleanupTerminatedPodExecutor: ThreadPoolExecutor = _ + private def getOrCreateKubernetesClient(kubernetesInfo: KubernetesInfo): KubernetesClient = { checkKubernetesInfo(kubernetesInfo) - kubernetesClients.computeIfAbsent(kubernetesInfo, kInfo => buildKubernetesClient(kInfo)) + kubernetesClients.computeIfAbsent( + kubernetesInfo, + kInfo => { + val kubernetesClient = buildKubernetesClient(kInfo) + cleanTerminatedAppPodsOnKubernetesClientInitialize(kInfo, kubernetesClient) + kubernetesClient + }) + } + + private def cleanTerminatedAppPodsOnKubernetesClientInitialize( + kubernetesInfo: KubernetesInfo, + kubernetesClient: KubernetesClient): Unit = { + if (kubernetesClientInitializeCleanupTerminatedPodExecutor != null) { + kubernetesClientInitializeCleanupTerminatedPodExecutor.submit(new Runnable { + override def run(): Unit = { + val existingPods = + kubernetesClient.pods().withLabel(LABEL_KYUUBI_UNIQUE_KEY).list().getItems + info(s"[$kubernetesInfo] Found ${existingPods.size()} existing pods with label " + + s"$LABEL_KYUUBI_UNIQUE_KEY") + val eventType = KubernetesResourceEventTypes.UPDATE + existingPods.asScala.filter(isSparkEnginePod).foreach { pod => + val appState = toApplicationState(pod, appStateSource, appStateContainer, eventType) + if (isTerminated(appState)) { + val kyuubiUniqueKey = pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY) + info(s"[$kubernetesInfo] Found existing pod ${pod.getMetadata.getName} with " + + s"${toLabel(kyuubiUniqueKey)} in app state $appState, marking it as terminated") + if (appInfoStore.get(kyuubiUniqueKey) == null) { + updateApplicationState(kubernetesInfo, pod, eventType) + } + markApplicationTerminated(kubernetesInfo, pod, eventType) + } + } + } + }) + } } private var metadataManager: Option[MetadataManager] = _ @@ -168,6 +204,9 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { TimeUnit.MILLISECONDS) cleanupCanceledAppPodExecutor = ThreadUtils.newDaemonCachedThreadPool( "cleanup-canceled-app-pod-thread") + kubernetesClientInitializeCleanupTerminatedPodExecutor = + ThreadUtils.newDaemonCachedThreadPool( + "kubernetes-client-initialize-cleanup-terminated-pod-thread") initializeKubernetesClient(kyuubiConf) } @@ -321,6 +360,11 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { ThreadUtils.shutdown(cleanupCanceledAppPodExecutor) cleanupCanceledAppPodExecutor = null } + + if (kubernetesClientInitializeCleanupTerminatedPodExecutor != null) { + ThreadUtils.shutdown(kubernetesClientInitializeCleanupTerminatedPodExecutor) + kubernetesClientInitializeCleanupTerminatedPodExecutor = null + } } private class SparkEnginePodEventHandler(kubernetesInfo: KubernetesInfo)