This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 27ad1024b [KYUUBI #5795][K8S] Support to cleanup the spark driver pod
periodically
27ad1024b is described below
commit 27ad1024b581f7016dccabbec8ddf0f7fa314bf7
Author: yeatsliao <[email protected]>
AuthorDate: Thu Dec 7 13:55:16 2023 +0800
[KYUUBI #5795][K8S] Support to cleanup the spark driver pod periodically
# :mag: Description
## Issue References ๐
This pull request fixes #5795
## Describe Your Solution ๐ง
Create a single daemon thread to traverse cache map periodically, which
will evict expired cache and trigger a pod clean up operation.
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
---
# Checklists
## ๐ Author Self Checklist
- [x] My code follows the [style
guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html)
of this project
- [ ] I have performed a self-review
- [ ] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my feature
works
- [ ] New and existing unit tests pass locally with my changes
- [ ] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
## ๐ Committer Pre-Merge Checklist
- [x] Pull request title is okay.
- [x] No license issues.
- [x] Milestone correctly set?
- [ ] Test coverage is ok
- [x] Assignees are selected.
- [x] Minimum number of approvals
- [x] No changes are requested
**Be nice. Be informative.**
Closes #5806 from liaoyt/master.
Closes #5795
75c2b68cc [yeatsliao] cleanup driver pod periodically
Authored-by: yeatsliao <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
docs/configuration/settings.md | 3 ++-
.../org/apache/kyuubi/config/KyuubiConf.scala | 13 +++++++++--
.../engine/KubernetesApplicationOperation.scala | 27 +++++++++++++++++++---
3 files changed, 37 insertions(+), 6 deletions(-)
diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 651829b4f..a3c0ca0df 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -326,7 +326,8 @@ You can configure the Kyuubi properties in
`$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.kubernetes.master.address |
<undefined> | The internal Kubernetes master (API server) address
to be used for kyuubi.
| string | 1.7.0 |
| kyuubi.kubernetes.namespace |
default | The namespace that will be used for running the
kyuubi pods and find engines.
| string | 1.7.0 |
| kyuubi.kubernetes.namespace.allow.list
|| The allowed kubernetes namespace list, if it is empty,
there is no kubernetes namespace limitation.
| set | 1.8.0 |
-| kyuubi.kubernetes.spark.cleanupTerminatedDriverPod | NONE
| Kyuubi server will delete the spark driver pod after the
application terminates for kyuubi.kubernetes.terminatedApplicationRetainPeriod.
Available options are NONE, ALL, COMPLETED and default value is None which
means none of the pod will be deleted
| string | 1.8.1 |
+| kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.checkInterval | PT1M
| Kyuubi server use guava cache as the cleanup trigger with
time-based eviction, but the eviction would not happened until any get/put
operation happened. This option schedule a daemon thread evict cache
periodically.
| duration |
1.8.1 |
+| kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.kind | NONE
| Kyuubi server will delete the spark driver pod after the
application terminates for kyuubi.kubernetes.terminatedApplicationRetainPeriod.
Available options are NONE, ALL, COMPLETED and default value is None which
means none of the pod will be deleted
| string | 1.8.1 |
| kyuubi.kubernetes.spark.forciblyRewriteDriverPodName.enabled | false
| Whether to forcibly rewrite Spark driver pod name with
'kyuubi-<uuid>-driver'. If disabled, Kyuubi will try to preserve the
application name while satisfying K8s' pod name policy, but some vendors may
have stricter pod name policies, thus the generated name may become illegal.
| boolean |
1.8.1 |
| kyuubi.kubernetes.spark.forciblyRewriteExecutorPodNamePrefix.enabled | false
| Whether to forcibly rewrite Spark executor pod name prefix
with 'kyuubi-<uuid>'. If disabled, Kyuubi will try to preserve the application
name while satisfying K8s' pod name policy, but some vendors may have stricter
Pod name policies, thus the generated name may become illegal.
| boolean | 1.8.1 |
| kyuubi.kubernetes.terminatedApplicationRetainPeriod | PT5M
| The period for which the Kyuubi server retains application
information after the application terminates.
| duration | 1.7.1 |
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index dcd84e7be..b655f7984 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1231,8 +1231,17 @@ object KyuubiConf {
.checkValue(_ > 0, "must be positive number")
.createWithDefault(Duration.ofMinutes(5).toMillis)
- val KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD: ConfigEntry[String] =
- buildConf("kyuubi.kubernetes.spark.cleanupTerminatedDriverPod")
+ val KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD_KIND_CHECK_INTERVAL:
ConfigEntry[Long] =
+
buildConf("kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.checkInterval")
+ .doc("Kyuubi server use guava cache as the cleanup trigger with
time-based eviction, " +
+ "but the eviction would not happened until any get/put operation
happened. " +
+ "This option schedule a daemon thread evict cache periodically.")
+ .version("1.8.1")
+ .timeConf
+ .createWithDefaultString("PT1M")
+
+ val KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD_KIND: ConfigEntry[String]
=
+ buildConf("kyuubi.kubernetes.spark.cleanupTerminatedDriverPod.kind")
.doc("Kyuubi server will delete the spark driver pod after " +
s"the application terminates for
${KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD.key}. " +
"Available options are NONE, ALL, COMPLETED and " +
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
index 922dd9a15..c7c69cc16 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
@@ -18,7 +18,7 @@
package org.apache.kyuubi.engine
import java.util.Locale
-import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService,
TimeUnit}
import scala.collection.JavaConverters._
import scala.util.control.NonFatal
@@ -34,7 +34,7 @@ import
org.apache.kyuubi.config.KyuubiConf.{KubernetesApplicationStateSource, Ku
import
org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource.KubernetesApplicationStateSource
import
org.apache.kyuubi.config.KyuubiConf.KubernetesCleanupDriverPodStrategy.{ALL,
COMPLETED, NONE}
import org.apache.kyuubi.engine.ApplicationState.{isTerminated,
ApplicationState, FAILED, FINISHED, NOT_FOUND, PENDING, RUNNING, UNKNOWN}
-import org.apache.kyuubi.util.KubernetesUtils
+import org.apache.kyuubi.util.{KubernetesUtils, ThreadUtils}
class KubernetesApplicationOperation extends ApplicationOperation with Logging
{
import KubernetesApplicationOperation._
@@ -64,6 +64,8 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
// key is kyuubi_unique_key
private var cleanupTerminatedAppInfoTrigger: Cache[String, ApplicationState]
= _
+ private var expireCleanUpTriggerCacheExecutor: ScheduledExecutorService = _
+
private def getOrCreateKubernetesClient(kubernetesInfo: KubernetesInfo):
KubernetesClient = {
checkKubernetesInfo(kubernetesInfo)
kubernetesClients.computeIfAbsent(kubernetesInfo, kInfo =>
buildKubernetesClient(kInfo))
@@ -109,7 +111,9 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
// Defer cleaning terminated application information
val retainPeriod =
conf.get(KyuubiConf.KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD)
val cleanupDriverPodStrategy = KubernetesCleanupDriverPodStrategy.withName(
- conf.get(KyuubiConf.KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD))
+ conf.get(KyuubiConf.KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD_KIND))
+ val cleanupDriverPodCheckInterval = conf.get(
+
KyuubiConf.KUBERNETES_SPARK_CLEANUP_TERMINATED_DRIVER_POD_KIND_CHECK_INTERVAL)
cleanupTerminatedAppInfoTrigger = CacheBuilder.newBuilder()
.expireAfterWrite(retainPeriod, TimeUnit.MILLISECONDS)
.removalListener((notification: RemovalNotification[String,
ApplicationState]) => {
@@ -147,6 +151,23 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
}
})
.build()
+ expireCleanUpTriggerCacheExecutor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+ "pod-cleanup-trigger-thread")
+ expireCleanUpTriggerCacheExecutor.scheduleWithFixedDelay(
+ () => {
+ try {
+ cleanupTerminatedAppInfoTrigger.asMap().asScala.foreach {
+ case (key, _) =>
+ // do get to trigger cache eviction
+ cleanupTerminatedAppInfoTrigger.getIfPresent(key)
+ }
+ } catch {
+ case NonFatal(e) => error("Failed to evict clean up terminated app
cache", e)
+ }
+ },
+ 5,
+ cleanupDriverPodCheckInterval,
+ TimeUnit.MINUTES)
}
override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean = {