This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 88fae49d7 [KYUUBI #5714] [K8S] Support to cleanup the spark driver pod
after application terminates for retain period
88fae49d7 is described below
commit 88fae49d7f28884d7d022d329b789e7cb728132c
Author: fwang12 <[email protected]>
AuthorDate: Thu Nov 16 19:33:00 2023 +0800
[KYUUBI #5714] [K8S] Support to cleanup the spark driver pod after
application terminates for retain period
# :mag: Description
## Describe Your Solution ๐ง
As title, support to cleanup the application pod after application
terminates for retain period
## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
Test locally.
---
# Checklists
## ๐ Author Self Checklist
- [x] My code follows the [style
guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html)
of this project
- [x] I have performed a self-review
- [x] I have commented my code, particularly in hard-to-understand areas
- [x] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my feature
works
- [ ] New and existing unit tests pass locally with my changes
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
## ๐ Committer Pre-Merge Checklist
- [x] Pull request title is okay.
- [x] No license issues.
- [x] Milestone correctly set?
- [ ] Test coverage is ok
- [x] Assignees are selected.
- [x] Minimum number of approvals
- [ ] No changes are requested
**Be nice. Be informative.**
Closes #5714 from turboFei/kill_k8s_pod.
Closes #5714
1e0787868 [fwang12] doc
0c9ff1a44 [fwang12] cleanup pod
ab95d4c03 [fwang12] save
Authored-by: fwang12 <[email protected]>
Signed-off-by: fwang12 <[email protected]>
---
docs/configuration/settings.md | 1 +
.../org/apache/kyuubi/config/KyuubiConf.scala | 8 +++++
.../engine/KubernetesApplicationOperation.scala | 41 ++++++++++++++++------
3 files changed, 40 insertions(+), 10 deletions(-)
diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 5577da674..b14a8ada1 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -322,6 +322,7 @@ You can configure the Kyuubi properties in
`$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.kubernetes.master.address |
<undefined> | The internal Kubernetes master (API server) address to be
used for kyuubi.
| string |
1.7.0 |
| kyuubi.kubernetes.namespace |
default | The namespace that will be used for running the kyuubi pods
and find engines.
| string | 1.7.0 |
| kyuubi.kubernetes.namespace.allow.list
|| The allowed kubernetes namespace list, if it is empty, there is
no kubernetes namespace limitation.
| set | 1.8.0 |
+| kyuubi.kubernetes.spark.deleteDriverPodOnTermination.enabled | false
| If set to true then Kyuubi server will delete the spark driver
pod after the application terminates for
kyuubi.kubernetes.terminatedApplicationRetainPeriod.
| boolean | 1.8.1 |
| kyuubi.kubernetes.spark.forciblyRewriteDriverPodName.enabled | false
| Whether to forcibly rewrite Spark driver pod name with
'kyuubi-<uuid>-driver'. If disabled, Kyuubi will try to preserve the
application name while satisfying K8s' pod name policy, but some vendors may
have stricter pod name policies, thus the generated name may become illegal.
| boolean | 1.8.1 |
| kyuubi.kubernetes.spark.forciblyRewriteExecutorPodNamePrefix.enabled | false
| Whether to forcibly rewrite Spark executor pod name prefix with
'kyuubi-<uuid>'. If disabled, Kyuubi will try to preserve the application name
while satisfying K8s' pod name policy, but some vendors may have stricter Pod
name policies, thus the generated name may become illegal. | boolean | 1.8.1 |
| kyuubi.kubernetes.terminatedApplicationRetainPeriod | PT5M
| The period for which the Kyuubi server retains application
information after the application terminates.
| duration | 1.7.1
|
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index a48f4ba9f..373c0190b 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1231,6 +1231,14 @@ object KyuubiConf {
.checkValue(_ > 0, "must be positive number")
.createWithDefault(Duration.ofMinutes(5).toMillis)
+ val KUBERNETES_SPARK_DELETE_DRIVER_POD_ON_TERMINATION_ENABLED:
ConfigEntry[Boolean] =
+ buildConf("kyuubi.kubernetes.spark.deleteDriverPodOnTermination.enabled")
+ .doc("If set to true then Kyuubi server will delete the spark driver pod
after " +
+ s"the application terminates for
${KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD.key}.")
+ .version("1.8.1")
+ .booleanConf
+ .createWithDefault(false)
+
//
///////////////////////////////////////////////////////////////////////////////////////////////
// SQL Engine Configuration
//
//
///////////////////////////////////////////////////////////////////////////////////////////////
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
index 16a0c29d1..a78dd8eb6 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
@@ -21,6 +21,7 @@ import java.util.Locale
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
import com.google.common.cache.{Cache, CacheBuilder, RemovalNotification}
import io.fabric8.kubernetes.api.model.Pod
@@ -49,8 +50,8 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
kyuubiConf.get(KyuubiConf.KUBERNETES_NAMESPACE_ALLOW_LIST)
// key is kyuubi_unique_key
- private val appInfoStore: ConcurrentHashMap[String, ApplicationInfo] =
- new ConcurrentHashMap[String, ApplicationInfo]
+ private val appInfoStore: ConcurrentHashMap[String, (KubernetesInfo,
ApplicationInfo)] =
+ new ConcurrentHashMap[String, (KubernetesInfo, ApplicationInfo)]
// key is kyuubi_unique_key
private var cleanupTerminatedAppInfoTrigger: Cache[String, ApplicationState]
= _
@@ -98,10 +99,29 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
submitTimeout = conf.get(KyuubiConf.ENGINE_KUBERNETES_SUBMIT_TIMEOUT)
// Defer cleaning terminated application information
val retainPeriod =
conf.get(KyuubiConf.KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD)
+ val deleteSparkDriverPodOnTermination =
+
conf.get(KyuubiConf.KUBERNETES_SPARK_DELETE_DRIVER_POD_ON_TERMINATION_ENABLED)
cleanupTerminatedAppInfoTrigger = CacheBuilder.newBuilder()
.expireAfterWrite(retainPeriod, TimeUnit.MILLISECONDS)
.removalListener((notification: RemovalNotification[String,
ApplicationState]) => {
- Option(appInfoStore.remove(notification.getKey)).foreach { removed =>
+ Option(appInfoStore.remove(notification.getKey)).foreach { case
(kubernetesInfo, removed) =>
+ if (deleteSparkDriverPodOnTermination) {
+ try {
+ val kubernetesClient =
getOrCreateKubernetesClient(kubernetesInfo)
+ if
(!kubernetesClient.pods().withName(removed.name).delete().isEmpty) {
+ info(s"[$kubernetesInfo] Operation of delete pod
${removed.name} with" +
+ s" ${toLabel(notification.getKey)} is completed.")
+ } else {
+ warn(s"[$kubernetesInfo] Failed to delete pod ${removed.name}
with" +
+ s" ${toLabel(notification.getKey)}.")
+ }
+ } catch {
+ case NonFatal(e) => error(
+ s"[$kubernetesInfo] Failed to delete pod ${removed.name}
with" +
+ s" ${toLabel(notification.getKey)}",
+ e)
+ }
+ }
info(s"Remove terminated application ${removed.id} with " +
s"[${toLabel(notification.getKey)}, state: ${removed.state}]")
}
@@ -127,7 +147,7 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
debug(s"[$kubernetesInfo] Deleting application[${toLabel(tag)}]'s info
from Kubernetes cluster")
try {
Option(appInfoStore.get(tag)) match {
- case Some(info) =>
+ case Some((_, info)) =>
debug(s"Application[${toLabel(tag)}] is in ${info.state} state")
info.state match {
case NOT_FOUND | FAILED | UNKNOWN =>
@@ -167,7 +187,8 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
try {
// need to initialize the kubernetes client if not exists
getOrCreateKubernetesClient(appMgrInfo.kubernetesInfo)
- val appInfo = appInfoStore.getOrDefault(tag, ApplicationInfo.NOT_FOUND)
+ val (_, appInfo) =
+ appInfoStore.getOrDefault(tag, appMgrInfo.kubernetesInfo ->
ApplicationInfo.NOT_FOUND)
(appInfo.state, submitTime) match {
// Kyuubi should wait second if pod is not be created
case (NOT_FOUND, Some(_submitTime)) =>
@@ -216,14 +237,14 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
override def onAdd(pod: Pod): Unit = {
if (isSparkEnginePod(pod)) {
- updateApplicationState(pod)
+ updateApplicationState(kubernetesInfo, pod)
KubernetesApplicationAuditLogger.audit(kubernetesInfo, pod)
}
}
override def onUpdate(oldPod: Pod, newPod: Pod): Unit = {
if (isSparkEnginePod(newPod)) {
- updateApplicationState(newPod)
+ updateApplicationState(kubernetesInfo, newPod)
val appState = toApplicationState(newPod.getStatus.getPhase)
if (isTerminated(appState)) {
markApplicationTerminated(newPod)
@@ -234,7 +255,7 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
override def onDelete(pod: Pod, deletedFinalStateUnknown: Boolean): Unit =
{
if (isSparkEnginePod(pod)) {
- updateApplicationState(pod)
+ updateApplicationState(kubernetesInfo, pod)
markApplicationTerminated(pod)
KubernetesApplicationAuditLogger.audit(kubernetesInfo, pod)
}
@@ -246,12 +267,12 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
labels.containsKey(LABEL_KYUUBI_UNIQUE_KEY) &&
labels.containsKey(SPARK_APP_ID_LABEL)
}
- private def updateApplicationState(pod: Pod): Unit = {
+ private def updateApplicationState(kubernetesInfo: KubernetesInfo, pod:
Pod): Unit = {
val appState = toApplicationState(pod.getStatus.getPhase)
debug(s"Driver Informer changes pod: ${pod.getMetadata.getName} to state:
$appState")
appInfoStore.put(
pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY),
- ApplicationInfo(
+ kubernetesInfo -> ApplicationInfo(
id = pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL),
name = pod.getMetadata.getName,
state = appState,