This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 88fae49d7 [KYUUBI #5714] [K8S] Support to cleanup the spark driver pod 
after application terminates for retain period
88fae49d7 is described below

commit 88fae49d7f28884d7d022d329b789e7cb728132c
Author: fwang12 <[email protected]>
AuthorDate: Thu Nov 16 19:33:00 2023 +0800

    [KYUUBI #5714] [K8S] Support to cleanup the spark driver pod after 
application terminates for retain period
    
    # :mag: Description
    
    ## Describe Your Solution ๐Ÿ”ง
    
    As title, support to cleanup the application pod after application 
terminates for retain period
    
    ## Types of changes :bookmark:
    
    - [ ] Bugfix (non-breaking change which fixes an issue)
    - [x] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    Test locally.
    
    ---
    
    # Checklists
    ## ๐Ÿ“ Author Self Checklist
    
    - [x] My code follows the [style 
guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html)
 of this project
    - [x] I have performed a self-review
    - [x] I have commented my code, particularly in hard-to-understand areas
    - [x] I have made corresponding changes to the documentation
    - [ ] My changes generate no new warnings
    - [ ] I have added tests that prove my fix is effective or that my feature 
works
    - [ ] New and existing unit tests pass locally with my changes
    - [x] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    ## ๐Ÿ“ Committer Pre-Merge Checklist
    
    - [x] Pull request title is okay.
    - [x] No license issues.
    - [x] Milestone correctly set?
    - [ ] Test coverage is ok
    - [x] Assignees are selected.
    - [x] Minimum number of approvals
    - [ ] No changes are requested
    
    **Be nice. Be informative.**
    
    Closes #5714 from turboFei/kill_k8s_pod.
    
    Closes #5714
    
    1e0787868 [fwang12] doc
    0c9ff1a44 [fwang12] cleanup pod
    ab95d4c03 [fwang12] save
    
    Authored-by: fwang12 <[email protected]>
    Signed-off-by: fwang12 <[email protected]>
---
 docs/configuration/settings.md                     |  1 +
 .../org/apache/kyuubi/config/KyuubiConf.scala      |  8 +++++
 .../engine/KubernetesApplicationOperation.scala    | 41 ++++++++++++++++------
 3 files changed, 40 insertions(+), 10 deletions(-)

diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 5577da674..b14a8ada1 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -322,6 +322,7 @@ You can configure the Kyuubi properties in 
`$KYUUBI_HOME/conf/kyuubi-defaults.co
 | kyuubi.kubernetes.master.address                                     | 
&lt;undefined&gt; | The internal Kubernetes master (API server) address to be 
used for kyuubi.                                                                
                                                                                
                                                              | string   | 
1.7.0 |
 | kyuubi.kubernetes.namespace                                          | 
default           | The namespace that will be used for running the kyuubi pods 
and find engines.                                                               
                                                                                
                                                            | string   | 1.7.0 |
 | kyuubi.kubernetes.namespace.allow.list                                       
           || The allowed kubernetes namespace list, if it is empty, there is 
no kubernetes namespace limitation.                                             
                                                                                
                                                        | set      | 1.8.0 |
+| kyuubi.kubernetes.spark.deleteDriverPodOnTermination.enabled         | false 
            | If set to true then Kyuubi server will delete the spark driver 
pod after the application terminates for 
kyuubi.kubernetes.terminatedApplicationRetainPeriod.                            
                                                                                
                | boolean  | 1.8.1 |
 | kyuubi.kubernetes.spark.forciblyRewriteDriverPodName.enabled         | false 
            | Whether to forcibly rewrite Spark driver pod name with 
'kyuubi-<uuid>-driver'. If disabled, Kyuubi will try to preserve the 
application name while satisfying K8s' pod name policy, but some vendors may 
have stricter pod name policies, thus the generated name may become illegal.   
| boolean  | 1.8.1 |
 | kyuubi.kubernetes.spark.forciblyRewriteExecutorPodNamePrefix.enabled | false 
            | Whether to forcibly rewrite Spark executor pod name prefix with 
'kyuubi-<uuid>'. If disabled, Kyuubi will try to preserve the application name 
while satisfying K8s' pod name policy, but some vendors may have stricter Pod 
name policies, thus the generated name may become illegal. | boolean  | 1.8.1 |
 | kyuubi.kubernetes.terminatedApplicationRetainPeriod                  | PT5M  
            | The period for which the Kyuubi server retains application 
information after the application terminates.                                   
                                                                                
                                                             | duration | 1.7.1 
|
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index a48f4ba9f..373c0190b 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1231,6 +1231,14 @@ object KyuubiConf {
       .checkValue(_ > 0, "must be positive number")
       .createWithDefault(Duration.ofMinutes(5).toMillis)
 
+  val KUBERNETES_SPARK_DELETE_DRIVER_POD_ON_TERMINATION_ENABLED: 
ConfigEntry[Boolean] =
+    buildConf("kyuubi.kubernetes.spark.deleteDriverPodOnTermination.enabled")
+      .doc("If set to true then Kyuubi server will delete the spark driver pod 
after " +
+        s"the application terminates for 
${KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD.key}.")
+      .version("1.8.1")
+      .booleanConf
+      .createWithDefault(false)
+
   // 
///////////////////////////////////////////////////////////////////////////////////////////////
   //                                 SQL Engine Configuration                  
                  //
   // 
///////////////////////////////////////////////////////////////////////////////////////////////
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
index 16a0c29d1..a78dd8eb6 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
@@ -21,6 +21,7 @@ import java.util.Locale
 import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
 
 import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
 
 import com.google.common.cache.{Cache, CacheBuilder, RemovalNotification}
 import io.fabric8.kubernetes.api.model.Pod
@@ -49,8 +50,8 @@ class KubernetesApplicationOperation extends 
ApplicationOperation with Logging {
     kyuubiConf.get(KyuubiConf.KUBERNETES_NAMESPACE_ALLOW_LIST)
 
   // key is kyuubi_unique_key
-  private val appInfoStore: ConcurrentHashMap[String, ApplicationInfo] =
-    new ConcurrentHashMap[String, ApplicationInfo]
+  private val appInfoStore: ConcurrentHashMap[String, (KubernetesInfo, 
ApplicationInfo)] =
+    new ConcurrentHashMap[String, (KubernetesInfo, ApplicationInfo)]
   // key is kyuubi_unique_key
   private var cleanupTerminatedAppInfoTrigger: Cache[String, ApplicationState] 
= _
 
@@ -98,10 +99,29 @@ class KubernetesApplicationOperation extends 
ApplicationOperation with Logging {
     submitTimeout = conf.get(KyuubiConf.ENGINE_KUBERNETES_SUBMIT_TIMEOUT)
     // Defer cleaning terminated application information
     val retainPeriod = 
conf.get(KyuubiConf.KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD)
+    val deleteSparkDriverPodOnTermination =
+      
conf.get(KyuubiConf.KUBERNETES_SPARK_DELETE_DRIVER_POD_ON_TERMINATION_ENABLED)
     cleanupTerminatedAppInfoTrigger = CacheBuilder.newBuilder()
       .expireAfterWrite(retainPeriod, TimeUnit.MILLISECONDS)
       .removalListener((notification: RemovalNotification[String, 
ApplicationState]) => {
-        Option(appInfoStore.remove(notification.getKey)).foreach { removed =>
+        Option(appInfoStore.remove(notification.getKey)).foreach { case 
(kubernetesInfo, removed) =>
+          if (deleteSparkDriverPodOnTermination) {
+            try {
+              val kubernetesClient = 
getOrCreateKubernetesClient(kubernetesInfo)
+              if 
(!kubernetesClient.pods().withName(removed.name).delete().isEmpty) {
+                info(s"[$kubernetesInfo] Operation of delete pod 
${removed.name} with" +
+                  s" ${toLabel(notification.getKey)} is completed.")
+              } else {
+                warn(s"[$kubernetesInfo] Failed to delete pod ${removed.name} 
with" +
+                  s" ${toLabel(notification.getKey)}.")
+              }
+            } catch {
+              case NonFatal(e) => error(
+                  s"[$kubernetesInfo] Failed to delete pod ${removed.name} 
with" +
+                    s" ${toLabel(notification.getKey)}",
+                  e)
+            }
+          }
           info(s"Remove terminated application ${removed.id} with " +
             s"[${toLabel(notification.getKey)}, state: ${removed.state}]")
         }
@@ -127,7 +147,7 @@ class KubernetesApplicationOperation extends 
ApplicationOperation with Logging {
     debug(s"[$kubernetesInfo] Deleting application[${toLabel(tag)}]'s info 
from Kubernetes cluster")
     try {
       Option(appInfoStore.get(tag)) match {
-        case Some(info) =>
+        case Some((_, info)) =>
           debug(s"Application[${toLabel(tag)}] is in ${info.state} state")
           info.state match {
             case NOT_FOUND | FAILED | UNKNOWN =>
@@ -167,7 +187,8 @@ class KubernetesApplicationOperation extends 
ApplicationOperation with Logging {
     try {
       // need to initialize the kubernetes client if not exists
       getOrCreateKubernetesClient(appMgrInfo.kubernetesInfo)
-      val appInfo = appInfoStore.getOrDefault(tag, ApplicationInfo.NOT_FOUND)
+      val (_, appInfo) =
+        appInfoStore.getOrDefault(tag, appMgrInfo.kubernetesInfo -> 
ApplicationInfo.NOT_FOUND)
       (appInfo.state, submitTime) match {
         // Kyuubi should wait second if pod is not be created
         case (NOT_FOUND, Some(_submitTime)) =>
@@ -216,14 +237,14 @@ class KubernetesApplicationOperation extends 
ApplicationOperation with Logging {
 
     override def onAdd(pod: Pod): Unit = {
       if (isSparkEnginePod(pod)) {
-        updateApplicationState(pod)
+        updateApplicationState(kubernetesInfo, pod)
         KubernetesApplicationAuditLogger.audit(kubernetesInfo, pod)
       }
     }
 
     override def onUpdate(oldPod: Pod, newPod: Pod): Unit = {
       if (isSparkEnginePod(newPod)) {
-        updateApplicationState(newPod)
+        updateApplicationState(kubernetesInfo, newPod)
         val appState = toApplicationState(newPod.getStatus.getPhase)
         if (isTerminated(appState)) {
           markApplicationTerminated(newPod)
@@ -234,7 +255,7 @@ class KubernetesApplicationOperation extends 
ApplicationOperation with Logging {
 
     override def onDelete(pod: Pod, deletedFinalStateUnknown: Boolean): Unit = 
{
       if (isSparkEnginePod(pod)) {
-        updateApplicationState(pod)
+        updateApplicationState(kubernetesInfo, pod)
         markApplicationTerminated(pod)
         KubernetesApplicationAuditLogger.audit(kubernetesInfo, pod)
       }
@@ -246,12 +267,12 @@ class KubernetesApplicationOperation extends 
ApplicationOperation with Logging {
     labels.containsKey(LABEL_KYUUBI_UNIQUE_KEY) && 
labels.containsKey(SPARK_APP_ID_LABEL)
   }
 
-  private def updateApplicationState(pod: Pod): Unit = {
+  private def updateApplicationState(kubernetesInfo: KubernetesInfo, pod: 
Pod): Unit = {
     val appState = toApplicationState(pod.getStatus.getPhase)
     debug(s"Driver Informer changes pod: ${pod.getMetadata.getName} to state: 
$appState")
     appInfoStore.put(
       pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY),
-      ApplicationInfo(
+      kubernetesInfo -> ApplicationInfo(
         id = pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL),
         name = pod.getMetadata.getName,
         state = appState,

Reply via email to