This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 302b5fa1e6 [KYUUBI #7101] Load the existing pods when initializing 
kubernetes client to cleanup terminated app pods
302b5fa1e6 is described below

commit 302b5fa1e60708cf8783893d166a22c7e6130309
Author: Wang, Fei <fwan...@ebay.com>
AuthorDate: Sun Jun 22 22:35:14 2025 -0700

    [KYUUBI #7101] Load the existing pods when initializing kubernetes client 
to cleanup terminated app pods
    
    ### Why are the changes needed?
    
    To prevent the terminated app pods leak if the events missed during kyuubi 
server restart.
    
    ### How was this patch tested?
    
    Manual test.
    
    ```
    :2025-06-17 17:50:37.275 INFO [main] 
org.apache.kyuubi.engine.KubernetesApplicationOperation: 
[KubernetesInfo(Some(28),Some(dls-prod))] Found existing pod 
kyuubi-xb406fc5-7b0b-4fdf-8531-929ed2ae250d-8998-5b406fc5-7b0b-4fdf-8531-929ed2ae250d-8998-90c0b328-930f-11ed-a1eb-0242ac120002-0-20250423211008-grectg-stm-17da59fe-caf4-41e4-a12f-6c1ed9a293f9-driver
 with label: kyuubi-unique-tag=17da59fe-caf4-41e4-a12f-6c1ed9a293f9 in app 
state FINISHED, marking it as terminated
    2025-06-17 17:50:37.278 INFO [main] 
org.apache.kyuubi.engine.KubernetesApplicationOperation: 
[KubernetesInfo(Some(28),Some(dls-prod))] Found existing pod 
kyuubi-xb406fc5-7b0b-4fdf-8531-929ed2ae250d-8998-5b406fc5-7b0b-4fdf-8531-929ed2ae250d-8998-90c0b328-930f-11ed-a1eb-0242ac120002-0-20250423212011-gpdtsi-stm-6a23000f-10be-4a42-ae62-4fa2da8fac07-driver
 with label: kyuubi-unique-tag=6a23000f-10be-4a42-ae62-4fa2da8fac07 in app 
state FINISHED, marking it as terminated
    ```
    The pods are cleaned up eventually.
    <img width="664" alt="image" 
src="https://github.com/user-attachments/assets/8cf58f61-065f-4fb0-9718-2e3c00e8d2e0";
 />
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #7101 from turboFei/pod_cleanup.
    
    Closes #7101
    
    7f76cf57c [Wang, Fei] async
    11c9db25d [Wang, Fei] cleanup
    
    Authored-by: Wang, Fei <fwan...@ebay.com>
    Signed-off-by: Wang, Fei <fwan...@ebay.com>
---
 .../engine/KubernetesApplicationOperation.scala    | 46 +++++++++++++++++++++-
 1 file changed, 45 insertions(+), 1 deletion(-)

diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
index a808030f43..7fe24e0c6b 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
@@ -75,9 +75,45 @@ class KubernetesApplicationOperation extends 
ApplicationOperation with Logging {
 
   private var cleanupCanceledAppPodExecutor: ThreadPoolExecutor = _
 
+  private var kubernetesClientInitializeCleanupTerminatedPodExecutor: 
ThreadPoolExecutor = _
+
   private def getOrCreateKubernetesClient(kubernetesInfo: KubernetesInfo): 
KubernetesClient = {
     checkKubernetesInfo(kubernetesInfo)
-    kubernetesClients.computeIfAbsent(kubernetesInfo, kInfo => 
buildKubernetesClient(kInfo))
+    kubernetesClients.computeIfAbsent(
+      kubernetesInfo,
+      kInfo => {
+        val kubernetesClient = buildKubernetesClient(kInfo)
+        cleanTerminatedAppPodsOnKubernetesClientInitialize(kInfo, 
kubernetesClient)
+        kubernetesClient
+      })
+  }
+
+  private def cleanTerminatedAppPodsOnKubernetesClientInitialize(
+      kubernetesInfo: KubernetesInfo,
+      kubernetesClient: KubernetesClient): Unit = {
+    if (kubernetesClientInitializeCleanupTerminatedPodExecutor != null) {
+      kubernetesClientInitializeCleanupTerminatedPodExecutor.submit(new 
Runnable {
+        override def run(): Unit = {
+          val existingPods =
+            
kubernetesClient.pods().withLabel(LABEL_KYUUBI_UNIQUE_KEY).list().getItems
+          info(s"[$kubernetesInfo] Found ${existingPods.size()} existing pods 
with label " +
+            s"$LABEL_KYUUBI_UNIQUE_KEY")
+          val eventType = KubernetesResourceEventTypes.UPDATE
+          existingPods.asScala.filter(isSparkEnginePod).foreach { pod =>
+            val appState = toApplicationState(pod, appStateSource, 
appStateContainer, eventType)
+            if (isTerminated(appState)) {
+              val kyuubiUniqueKey = 
pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY)
+              info(s"[$kubernetesInfo] Found existing pod 
${pod.getMetadata.getName} with " +
+                s"${toLabel(kyuubiUniqueKey)} in app state $appState, marking 
it as terminated")
+              if (appInfoStore.get(kyuubiUniqueKey) == null) {
+                updateApplicationState(kubernetesInfo, pod, eventType)
+              }
+              markApplicationTerminated(kubernetesInfo, pod, eventType)
+            }
+          }
+        }
+      })
+    }
   }
 
   private var metadataManager: Option[MetadataManager] = _
@@ -168,6 +204,9 @@ class KubernetesApplicationOperation extends 
ApplicationOperation with Logging {
       TimeUnit.MILLISECONDS)
     cleanupCanceledAppPodExecutor = ThreadUtils.newDaemonCachedThreadPool(
       "cleanup-canceled-app-pod-thread")
+    kubernetesClientInitializeCleanupTerminatedPodExecutor =
+      ThreadUtils.newDaemonCachedThreadPool(
+        "kubernetes-client-initialize-cleanup-terminated-pod-thread")
     initializeKubernetesClient(kyuubiConf)
   }
 
@@ -321,6 +360,11 @@ class KubernetesApplicationOperation extends 
ApplicationOperation with Logging {
       ThreadUtils.shutdown(cleanupCanceledAppPodExecutor)
       cleanupCanceledAppPodExecutor = null
     }
+
+    if (kubernetesClientInitializeCleanupTerminatedPodExecutor != null) {
+      
ThreadUtils.shutdown(kubernetesClientInitializeCleanupTerminatedPodExecutor)
+      kubernetesClientInitializeCleanupTerminatedPodExecutor = null
+    }
   }
 
   private class SparkEnginePodEventHandler(kubernetesInfo: KubernetesInfo)

Reply via email to