This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 27ad1024b [KYUUBI #5795][K8S] Support to cleanup the spark driver pod 
periodically
27ad1024b is described below

commit 27ad1024b581f7016dccabbec8ddf0f7fa314bf7
Author: yeatsliao <[email protected]>
AuthorDate: Thu Dec 7 13:55:16 2023 +0800

    [KYUUBI #5795][K8S] Support to cleanup the spark driver pod periodically
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    This pull request fixes #5795
    
    ## Describe Your Solution ๐Ÿ”ง
    
    Create a single daemon thread to traverse cache map periodically, which 
will evict expired cache and trigger a pod clean up operation.
    
    ## Types of changes :bookmark:
    
    - [ ] Bugfix (non-breaking change which fixes an issue)
    - [x] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    
    #### Related Unit Tests
    
    ---
    
    # Checklists
    ## ๐Ÿ“ Author Self Checklist
    
    - [x] My code follows the [style 
guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html)
 of this project
    - [ ] I have performed a self-review
    - [ ] I have commented my code, particularly in hard-to-understand areas
    - [ ] I have made corresponding changes to the documentation
    - [ ] My changes generate no new warnings
    - [ ] I have added tests that prove my fix is effective or that my feature 
works
    - [ ] New and existing unit tests pass locally with my changes
    - [ ] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    ## ๐Ÿ“ Committer Pre-Merge Checklist
    
    - [x] Pull request title is okay.
    - [x] No license issues.
    - [x] Milestone correctly set?
    - [ ] Test coverage is ok
    - [x] Assignees are selected.
    - [x] Minimum number of approvals
    - [x] No changes are requested
    
    **Be nice. Be informative.**
    
    Closes #5806 from liaoyt/master.
    
    Closes #5795
    
    75c2b68cc [yeatsliao] cleanup driver pod periodically
    
    Authored-by: yeatsliao <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 docs/configuration/settings.md                     |  3 ++-
 .../org/apache/kyuubi/config/KyuubiConf.scala      | 13 +++++++++--
 .../engine/KubernetesApplicationOperation.scala    | 27 +++++++++++++++++++---
 3 files changed, 37 insertions(+), 6 deletions(-)

diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 651829b4f..a3c0ca0df 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -326,7 +326,8 @@ You can configure the Kyuubi properties in 
`$KYUUBI_HOME/conf/kyuubi-defaults.co
 | kyuubi.kubernetes.master.address                                     | 
&lt;undefined&gt;       | The internal Kubernetes master (API server) address 
to be used for kyuubi.                                                          
                                                                                
                                                                                
                                                     | string   | 1.7.0 |
 | kyuubi.kubernetes.namespace                                          | 
default                 | The namespace that will be used for running the 
kyuubi pods and find engines.                                                   
                                                                                
                                                                                
                                                         | string   | 1.7.0 |
 | kyuubi.kubernetes.namespace.allow.list                                       
                 || The allowed kubernetes namespace list, if it is empty, 
there is no kubernetes namespace limitation.                                    
                                                                                
                                                                                
                                                  | set      | 1.8.0 |
-| kyuubi.kubernetes.spark.cleanupTerminatedDriverPod                   | NONE  
                  | Kyuubi server will delete the spark driver pod after the 
application terminates for kyuubi.kubernetes.terminatedApplicationRetainPeriod. 
Available options are NONE, ALL, COMPLETED and default value is None which 
means none of the pod will be deleted                                           
                                                     | string   | 1.8.1 |
+| kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.checkInterval     | PT1M  
                  | Kyuubi server use guava cache as the cleanup trigger with 
time-based eviction, but the eviction would not happened until any get/put 
operation happened. This option schedule a daemon thread evict cache 
periodically.                                                                   
                                                               | duration | 
1.8.1 |
+| kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.kind              | NONE  
                  | Kyuubi server will delete the spark driver pod after the 
application terminates for kyuubi.kubernetes.terminatedApplicationRetainPeriod. 
Available options are NONE, ALL, COMPLETED and default value is None which 
means none of the pod will be deleted                                           
                                                     | string   | 1.8.1 |
 | kyuubi.kubernetes.spark.forciblyRewriteDriverPodName.enabled         | false 
                  | Whether to forcibly rewrite Spark driver pod name with 
'kyuubi-<uuid>-driver'. If disabled, Kyuubi will try to preserve the 
application name while satisfying K8s' pod name policy, but some vendors may 
have stricter pod name policies, thus the generated name may become illegal.    
                                                                | boolean  | 
1.8.1 |
 | kyuubi.kubernetes.spark.forciblyRewriteExecutorPodNamePrefix.enabled | false 
                  | Whether to forcibly rewrite Spark executor pod name prefix 
with 'kyuubi-<uuid>'. If disabled, Kyuubi will try to preserve the application 
name while satisfying K8s' pod name policy, but some vendors may have stricter 
Pod name policies, thus the generated name may become illegal.                  
                                                | boolean  | 1.8.1 |
 | kyuubi.kubernetes.terminatedApplicationRetainPeriod                  | PT5M  
                  | The period for which the Kyuubi server retains application 
information after the application terminates.                                   
                                                                                
                                                                                
                                              | duration | 1.7.1 |
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index dcd84e7be..b655f7984 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1231,8 +1231,17 @@ object KyuubiConf {
       .checkValue(_ > 0, "must be positive number")
       .createWithDefault(Duration.ofMinutes(5).toMillis)
 
-  val KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD: ConfigEntry[String] =
-    buildConf("kyuubi.kubernetes.spark.cleanupTerminatedDriverPod")
+  val KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD_KIND_CHECK_INTERVAL: 
ConfigEntry[Long] =
+    
buildConf("kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.checkInterval")
+      .doc("Kyuubi server use guava cache as the cleanup trigger with 
time-based eviction, " +
+        "but the eviction would not happened until any get/put operation 
happened. " +
+        "This option schedule a daemon thread evict cache periodically.")
+      .version("1.8.1")
+      .timeConf
+      .createWithDefaultString("PT1M")
+
+  val KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD_KIND: ConfigEntry[String] 
=
+    buildConf("kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.kind")
       .doc("Kyuubi server will delete the spark driver pod after " +
         s"the application terminates for 
${KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD.key}. " +
         "Available options are NONE, ALL, COMPLETED and " +
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
index 922dd9a15..c7c69cc16 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
@@ -18,7 +18,7 @@
 package org.apache.kyuubi.engine
 
 import java.util.Locale
-import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, 
TimeUnit}
 
 import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
@@ -34,7 +34,7 @@ import 
org.apache.kyuubi.config.KyuubiConf.{KubernetesApplicationStateSource, Ku
 import 
org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource.KubernetesApplicationStateSource
 import 
org.apache.kyuubi.config.KyuubiConf.KubernetesCleanupDriverPodStrategy.{ALL, 
COMPLETED, NONE}
 import org.apache.kyuubi.engine.ApplicationState.{isTerminated, 
ApplicationState, FAILED, FINISHED, NOT_FOUND, PENDING, RUNNING, UNKNOWN}
-import org.apache.kyuubi.util.KubernetesUtils
+import org.apache.kyuubi.util.{KubernetesUtils, ThreadUtils}
 
 class KubernetesApplicationOperation extends ApplicationOperation with Logging 
{
   import KubernetesApplicationOperation._
@@ -64,6 +64,8 @@ class KubernetesApplicationOperation extends 
ApplicationOperation with Logging {
   // key is kyuubi_unique_key
   private var cleanupTerminatedAppInfoTrigger: Cache[String, ApplicationState] 
= _
 
+  private var expireCleanUpTriggerCacheExecutor: ScheduledExecutorService = _
+
   private def getOrCreateKubernetesClient(kubernetesInfo: KubernetesInfo): 
KubernetesClient = {
     checkKubernetesInfo(kubernetesInfo)
     kubernetesClients.computeIfAbsent(kubernetesInfo, kInfo => 
buildKubernetesClient(kInfo))
@@ -109,7 +111,9 @@ class KubernetesApplicationOperation extends 
ApplicationOperation with Logging {
     // Defer cleaning terminated application information
     val retainPeriod = 
conf.get(KyuubiConf.KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD)
     val cleanupDriverPodStrategy = KubernetesCleanupDriverPodStrategy.withName(
-      conf.get(KyuubiConf.KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD))
+      conf.get(KyuubiConf.KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD_KIND))
+    val cleanupDriverPodCheckInterval = conf.get(
+      
KyuubiConf.KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD_KIND_CHECK_INTERVAL)
     cleanupTerminatedAppInfoTrigger = CacheBuilder.newBuilder()
       .expireAfterWrite(retainPeriod, TimeUnit.MILLISECONDS)
       .removalListener((notification: RemovalNotification[String, 
ApplicationState]) => {
@@ -147,6 +151,23 @@ class KubernetesApplicationOperation extends 
ApplicationOperation with Logging {
         }
       })
       .build()
+    expireCleanUpTriggerCacheExecutor = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+      "pod-cleanup-trigger-thread")
+    expireCleanUpTriggerCacheExecutor.scheduleWithFixedDelay(
+      () => {
+        try {
+          cleanupTerminatedAppInfoTrigger.asMap().asScala.foreach {
+            case (key, _) =>
+              // do get to trigger cache eviction
+              cleanupTerminatedAppInfoTrigger.getIfPresent(key)
+          }
+        } catch {
+          case NonFatal(e) => error("Failed to evict clean up terminated app 
cache", e)
+        }
+      },
+      5,
+      cleanupDriverPodCheckInterval,
+      TimeUnit.MINUTES)
   }
 
   override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean = {

Reply via email to