This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new d9e14f239 [KYUUBI #4623][K8S] KubernetesApplicationOperation uses
Informer instead of list
d9e14f239 is described below
commit d9e14f239d5f27c3149519b70665fe36feaf0b3f
Author: zwangsheng <[email protected]>
AuthorDate: Fri Mar 31 15:21:59 2023 +0800
[KYUUBI #4623][K8S] KubernetesApplicationOperation uses Informer instead of
list
### _Why are the changes needed?_
Close #4623
To reduce the pressure on the Api Server (which use the kubernetes client
polls with label to find the `spark driver pod` when multiple Rest Application
are running at the same time), use informer, the kubernetes-recommended method
of maintaining the application state.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
locally before make a pull request
- [x] Run CI
Closes #4625 from zwangsheng/KYUUBI_4623.
Closes #4623
a415bef7f [Cheng Pan] nit
136d0db4d [Cheng Pan] 171
b5d3c237a [Cheng Pan] re-generate conf
bf14ad870 [Cheng Pan] nit
9ee7e04f9 [Cheng Pan] nit
301162ea0 [Cheng Pan] nit
1d426922b [Cheng Pan] nit
b95d7a650 [Cheng Pan] improve
cc8d2c7f4 [zwangsheng] fix comments
d017bafdf [zwangsheng] Set resycn 0
28f9a70d9 [zwangsheng] Reorder func & slow get app info
22d9c1662 [zwangsheng] fix setting
8e0940334 [zwangsheng] fix comments
10965d3df [zwangsheng] Rename fileter function => isSparkEnginePod
b02677154 [zwangsheng] rename
78c9fdb17 [zwangsheng] fix comments
6d31f70d1 [zwangsheng] Fix IT Test
f43bba2b9 [zwangsheng] fix
17e4f55eb [zwangsheng] debug
be8da790e [zwangsheng] debug
0db45a513 [zwangsheng] retest
a93786abc [zwangsheng] Fix style
652ee837e [zwangsheng] Add Setting & Debug
4add7e4e2 [zwangsheng] improve
1f4341237 [zwangsheng] remove unused import
35acd6106 [zwangsheng] fix compile
05dfc598e [zwangsheng] [KYUUBI #4623][Improvement][K8S] Remove cached app
info when out of time
4ab530e99 [zwangsheng] [KYUUBI #4623][Improvement][K8S]
kubernetesApplicationOperation Using Informer instead of list
Lead-authored-by: zwangsheng <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
docs/deployment/settings.md | 25 +--
.../test/spark/SparkOnKubernetesTestsSuite.scala | 17 +-
.../org/apache/kyuubi/config/KyuubiConf.scala | 11 +-
.../kyuubi/engine/ApplicationOperation.scala | 5 +
.../engine/KubernetesApplicationOperation.scala | 190 ++++++++++++++-------
5 files changed, 160 insertions(+), 88 deletions(-)
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index e7d939b9b..960f2c328 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -169,7 +169,7 @@ You can configure the Kyuubi properties in
`$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.engine.spark.python.env.archive | <undefined>
| Portable Python env archive used for Spark engine Python language
mode.
[...]
| kyuubi.engine.spark.python.env.archive.exec.path | bin/python
| The Python exec path under the Python env archive.
[...]
| kyuubi.engine.spark.python.home.archive | <undefined>
| Spark archive containing $SPARK_HOME/python directory, which is used
to init session Python worker for Python language mode.
[...]
-| kyuubi.engine.submit.timeout | PT30S
| Period to tolerant Driver Pod ephemerally invisible after submitting.
In some Resource Managers, e.g. K8s, the Driver Pod is not invisible
immediately after `spark-submit` is returned.
[...]
+| kyuubi.engine.submit.timeout | PT30S
| Period to tolerant Driver Pod ephemerally invisible after submitting.
In some Resource Managers, e.g. K8s, the Driver Pod is not visible immediately
after `spark-submit` is returned.
[...]
| kyuubi.engine.trino.event.loggers | JSON
| A comma-separated list of engine history loggers, where
engine/session/operation etc events go.<ul> <li>JSON: the events will be
written to the location of kyuubi.engine.event.json.log.path</li> <li>JDBC: to
be done</li> <li>CUSTOM: to be done.</li></ul>
[...]
| kyuubi.engine.trino.extra.classpath | <undefined>
| The extra classpath for the Trino query engine, for configuring other
libs which may need by the Trino engine
[...]
| kyuubi.engine.trino.java.options | <undefined>
| The extra Java options for the Trino query engine
[...]
@@ -292,17 +292,18 @@ You can configure the Kyuubi properties in
`$KYUUBI_HOME/conf/kyuubi-defaults.co
### Kubernetes
-| Key | Default |
Meaning
| Type | Since |
-|-----------------------------------------------|-------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|-------|
-| kyuubi.kubernetes.authenticate.caCertFile | <undefined> | Path to
the CA cert file for connecting to the Kubernetes API server over TLS from the
kyuubi. Specify this as a path as opposed to a URI (i.e. do not provide a
scheme) | string | 1.7.0 |
-| kyuubi.kubernetes.authenticate.clientCertFile | <undefined> | Path to
the client cert file for connecting to the Kubernetes API server over TLS from
the kyuubi. Specify this as a path as opposed to a URI (i.e. do not provide a
scheme) | string | 1.7.0 |
-| kyuubi.kubernetes.authenticate.clientKeyFile | <undefined> | Path to
the client key file for connecting to the Kubernetes API server over TLS from
the kyuubi. Specify this as a path as opposed to a URI (i.e. do not provide a
scheme) | string | 1.7.0 |
-| kyuubi.kubernetes.authenticate.oauthToken | <undefined> | The
OAuth token to use when authenticating against the Kubernetes API server. Note
that unlike, the other authentication options, this must be the exact string
value of the token to use for the authentication. | string | 1.7.0 |
-| kyuubi.kubernetes.authenticate.oauthTokenFile | <undefined> | Path to
the file containing the OAuth token to use when authenticating against the
Kubernetes API server. Specify this as a path as opposed to a URI (i.e. do not
provide a scheme) | string | 1.7.0 |
-| kyuubi.kubernetes.context | <undefined> | The
desired context from your kubernetes config file used to configure the K8s
client for interacting with the cluster.
| string | 1.6.0 |
-| kyuubi.kubernetes.master.address | <undefined> | The
internal Kubernetes master (API server) address to be used for kyuubi.
| string | 1.7.0 |
-| kyuubi.kubernetes.namespace | default | The
namespace that will be used for running the kyuubi pods and find engines.
| string | 1.7.0 |
-| kyuubi.kubernetes.trust.certificates | false | If set
to true then client can submit to kubernetes cluster only with token
| boolean | 1.7.0 |
+| Key | Default |
Meaning
| Type | Since |
+|-----------------------------------------------------|-------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------|
+| kyuubi.kubernetes.authenticate.caCertFile | <undefined> |
Path to the CA cert file for connecting to the Kubernetes API server over TLS
from the kyuubi. Specify this as a path as opposed to a URI (i.e. do not
provide a scheme) | string | 1.7.0 |
+| kyuubi.kubernetes.authenticate.clientCertFile | <undefined> |
Path to the client cert file for connecting to the Kubernetes API server over
TLS from the kyuubi. Specify this as a path as opposed to a URI (i.e. do not
provide a scheme) | string | 1.7.0 |
+| kyuubi.kubernetes.authenticate.clientKeyFile | <undefined> |
Path to the client key file for connecting to the Kubernetes API server over
TLS from the kyuubi. Specify this as a path as opposed to a URI (i.e. do not
provide a scheme) | string | 1.7.0 |
+| kyuubi.kubernetes.authenticate.oauthToken | <undefined> |
The OAuth token to use when authenticating against the Kubernetes API server.
Note that unlike, the other authentication options, this must be the exact
string value of the token to use for the authentication. | string | 1.7.0 |
+| kyuubi.kubernetes.authenticate.oauthTokenFile | <undefined> |
Path to the file containing the OAuth token to use when authenticating against
the Kubernetes API server. Specify this as a path as opposed to a URI (i.e. do
not provide a scheme) | string | 1.7.0 |
+| kyuubi.kubernetes.context | <undefined> |
The desired context from your kubernetes config file used to configure the K8s
client for interacting with the cluster.
| string | 1.6.0 |
+| kyuubi.kubernetes.master.address | <undefined> |
The internal Kubernetes master (API server) address to be used for kyuubi.
| string | 1.7.0 |
+| kyuubi.kubernetes.namespace | default |
The namespace that will be used for running the kyuubi pods and find engines.
| string | 1.7.0 |
+| kyuubi.kubernetes.terminatedApplicationRetainPeriod | PT5M |
The period for which the Kyuubi server retains application information after
the application terminates.
| duration | 1.7.1 |
+| kyuubi.kubernetes.trust.certificates | false | If
set to true then client can submit to kubernetes cluster only with token
| boolean | 1.7.0 |
### Metadata
diff --git
a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
index 831504608..9ddcc5937 100644
---
a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
+++
b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
@@ -208,19 +208,18 @@ class KyuubiOperationKubernetesClusterClusterModeSuite
batchRequest.getConf.asScala.toMap,
batchRequest)
- val session =
sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSessionImpl]
- val batchJobSubmissionOp = session.batchJobSubmissionOp
-
- eventually(timeout(3.minutes), interval(50.milliseconds)) {
- val appInfo = batchJobSubmissionOp.getOrFetchCurrentApplicationInfo
- assert(appInfo.nonEmpty)
- assert(appInfo.exists(_.state == RUNNING))
- assert(appInfo.exists(_.name.startsWith(driverPodNamePrefix)))
+ // wait for driver pod start
+ eventually(timeout(3.minutes), interval(5.second)) {
+ // trigger k8sOperation init here
+ val appInfo =
k8sOperation.getApplicationInfoByTag(sessionHandle.identifier.toString)
+ assert(appInfo.state == RUNNING)
+ assert(appInfo.name.startsWith(driverPodNamePrefix))
}
val killResponse =
k8sOperation.killApplicationByTag(sessionHandle.identifier.toString)
assert(killResponse._1)
- assert(killResponse._2 startsWith "Operation of deleted appId:")
+ assert(killResponse._2 endsWith "is completed")
+ assert(killResponse._2 contains sessionHandle.identifier.toString)
eventually(timeout(3.minutes), interval(50.milliseconds)) {
val appInfo =
k8sOperation.getApplicationInfoByTag(sessionHandle.identifier.toString)
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 5e7755952..b5229e2ad 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1171,6 +1171,15 @@ object KyuubiConf {
.booleanConf
.createWithDefault(false)
+ val KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD: ConfigEntry[Long] =
+ buildConf("kyuubi.kubernetes.terminatedApplicationRetainPeriod")
+ .doc("The period for which the Kyuubi server retains application
information after " +
+ "the application terminates.")
+ .version("1.7.1")
+ .timeConf
+ .checkValue(_ > 0, "must be positive number")
+ .createWithDefault(Duration.ofMinutes(5).toMillis)
+
//
///////////////////////////////////////////////////////////////////////////////////////////////
// SQL Engine Configuration
//
//
///////////////////////////////////////////////////////////////////////////////////////////////
@@ -2553,7 +2562,7 @@ object KyuubiConf {
val ENGINE_SUBMIT_TIMEOUT: ConfigEntry[Long] =
buildConf("kyuubi.engine.submit.timeout")
.doc("Period to tolerant Driver Pod ephemerally invisible after
submitting. " +
- "In some Resource Managers, e.g. K8s, the Driver Pod is not invisible
immediately " +
+ "In some Resource Managers, e.g. K8s, the Driver Pod is not visible
immediately " +
"after `spark-submit` is returned.")
.version("1.7.1")
.timeConf
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
index 00db372ce..a2b3d0f76 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
@@ -100,6 +100,11 @@ case class ApplicationInfo(
}
}
+object ApplicationInfo {
+ val NOT_FOUND: ApplicationInfo = ApplicationInfo(null, null,
ApplicationState.NOT_FOUND)
+ val UNKNOWN: ApplicationInfo = ApplicationInfo(null, null,
ApplicationState.UNKNOWN)
+}
+
object ApplicationOperation {
val NOT_FOUND = "APPLICATION_NOT_FOUND"
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
index d0820b9ae..83792f52f 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
@@ -17,30 +17,54 @@
package org.apache.kyuubi.engine
-import java.util
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import com.google.common.cache.{Cache, CacheBuilder, RemovalNotification}
import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.informers.{ResourceEventHandler,
SharedIndexInformer}
-import org.apache.kyuubi.Logging
+import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.engine.ApplicationState.{ApplicationState, FAILED,
FINISHED, PENDING, RUNNING, UNKNOWN}
-import
org.apache.kyuubi.engine.KubernetesApplicationOperation.{toApplicationState,
SPARK_APP_ID_LABEL}
+import org.apache.kyuubi.engine.ApplicationState.{isTerminated,
ApplicationState, FAILED, FINISHED, NOT_FOUND, PENDING, RUNNING, UNKNOWN}
+import
org.apache.kyuubi.engine.KubernetesApplicationOperation.{toApplicationState,
LABEL_KYUUBI_UNIQUE_KEY, SPARK_APP_ID_LABEL}
import org.apache.kyuubi.util.KubernetesUtils
class KubernetesApplicationOperation extends ApplicationOperation with Logging
{
@volatile
private var kubernetesClient: KubernetesClient = _
-
+ private var enginePodInformer: SharedIndexInformer[Pod] = _
private var submitTimeout: Long = _
+ // key is kyuubi_unique_key
+ private val appInfoStore: ConcurrentHashMap[String, ApplicationInfo] =
+ new ConcurrentHashMap[String, ApplicationInfo]
+ // key is kyuubi_unique_key
+ private var cleanupTerminatedAppInfoTrigger: Cache[String, ApplicationState]
= _
+
override def initialize(conf: KyuubiConf): Unit = {
info("Start initializing Kubernetes Client.")
kubernetesClient = KubernetesUtils.buildKubernetesClient(conf) match {
case Some(client) =>
info(s"Initialized Kubernetes Client connect to:
${client.getMasterUrl}")
submitTimeout = conf.get(KyuubiConf.ENGINE_SUBMIT_TIMEOUT)
+ // Disable resync, see
https://github.com/fabric8io/kubernetes-client/discussions/5015
+ enginePodInformer = client.pods()
+ .withLabel(LABEL_KYUUBI_UNIQUE_KEY)
+ .inform(new SparkEnginePodEventHandler)
+ info("Start Kubernetes Client Informer.")
+ // Defer cleaning terminated application information
+ val retainPeriod =
conf.get(KyuubiConf.KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD)
+ cleanupTerminatedAppInfoTrigger = CacheBuilder.newBuilder()
+ .expireAfterWrite(retainPeriod, TimeUnit.MILLISECONDS)
+ .removalListener((notification: RemovalNotification[String,
ApplicationState]) => {
+ Option(appInfoStore.remove(notification.getKey)).foreach { removed
=>
+ info(s"Remove terminated application ${removed.id} with " +
+ s"tag ${notification.getKey} and state ${removed.state}")
+ }
+ })
+ .build()
client
case None =>
warn("Fail to init Kubernetes Client for Kubernetes Application
Operation")
@@ -55,34 +79,26 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
}
override def killApplicationByTag(tag: String): KillResponse = {
- if (kubernetesClient != null) {
- debug(s"Deleting application info from Kubernetes cluster by $tag tag")
- try {
- // Need driver only
- val podList = findDriverPodByTag(tag)
- if (podList.size() != 0) {
- val targetPod = podList.get(0)
- toApplicationState(targetPod.getStatus.getPhase) match {
- case FAILED | UNKNOWN =>
- (
- false,
- s"Target Pod ${targetPod.getMetadata.getName} is in FAILED or
UNKNOWN status")
- case _ =>
- (
-
!kubernetesClient.pods.withName(targetPod.getMetadata.getName).delete().isEmpty,
- s"Operation of deleted appId:
${podList.get(0).getMetadata.getName} is completed")
- }
- } else {
+ if (kubernetesClient == null) {
+ throw new IllegalStateException("Methods initialize and isSupported must
be called ahead")
+ }
+ debug(s"Deleting application info from Kubernetes cluster by $tag tag")
+ try {
+ val info = appInfoStore.getOrDefault(tag, ApplicationInfo.NOT_FOUND)
+ debug(s"Application info[tag: $tag] is in ${info.state}")
+ info.state match {
+ case NOT_FOUND | FAILED | UNKNOWN =>
(
false,
- s"Target Pod(tag: $tag) is not found, due to pod have been deleted
or not created")
- }
- } catch {
- case e: Exception =>
- (false, s"Failed to terminate application with $tag, due to
${e.getMessage}")
+ s"Target application[tag: $tag] is in ${info.state} status")
+ case _ =>
+ (
+ !kubernetesClient.pods.withName(info.name).delete().isEmpty,
+ s"Operation of deleted application[appId: ${info.id} ,tag: $tag]
is completed")
}
- } else {
- throw new IllegalStateException("Methods initialize and isSupported must
be called ahead")
+ } catch {
+ case e: Exception =>
+ (false, s"Failed to terminate application with $tag, due to
${e.getMessage}")
}
}
@@ -92,59 +108,101 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
}
debug(s"Getting application info from Kubernetes cluster by $tag tag")
try {
- val podList = findDriverPodByTag(tag)
- if (podList.size() != 0) {
- val pod = podList.get(0)
- val info = ApplicationInfo(
- // spark pods always tag label `spark-app-selector:<spark-app-id>`
- id = pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL),
- name = pod.getMetadata.getName,
- state =
KubernetesApplicationOperation.toApplicationState(pod.getStatus.getPhase),
- error = Option(pod.getStatus.getReason))
- debug(s"Successfully got application info by $tag: $info")
- return info
- }
- // Kyuubi should wait second if pod is not be created
- submitTime match {
- case Some(time) =>
- val elapsedTime = System.currentTimeMillis() - time
+ val appInfo = appInfoStore.getOrDefault(tag, ApplicationInfo.NOT_FOUND)
+ (appInfo.state, submitTime) match {
+ // Kyuubi should wait second if pod is not be created
+ case (NOT_FOUND, Some(_submitTime)) =>
+ val elapsedTime = System.currentTimeMillis - _submitTime
if (elapsedTime > submitTimeout) {
error(s"Can't find target driver pod by tag: $tag, " +
s"elapsed time: ${elapsedTime}ms exceeds ${submitTimeout}ms.")
- ApplicationInfo(id = null, name = null, ApplicationState.NOT_FOUND)
+ ApplicationInfo.NOT_FOUND
} else {
warn("Wait for driver pod to be created, " +
s"elapsed time: ${elapsedTime}ms, return UNKNOWN status")
- ApplicationInfo(id = null, name = null, ApplicationState.UNKNOWN)
+ ApplicationInfo.UNKNOWN
}
- case None =>
- ApplicationInfo(id = null, name = null, ApplicationState.NOT_FOUND)
+ case (NOT_FOUND, None) =>
+ ApplicationInfo.NOT_FOUND
+ case _ =>
+ debug(s"Successfully got application info by $tag: $appInfo")
+ appInfo
}
} catch {
case e: Exception =>
error(s"Failed to get application with $tag, due to ${e.getMessage}")
- ApplicationInfo(id = null, name = null, ApplicationState.NOT_FOUND)
+ ApplicationInfo.NOT_FOUND
}
}
- private def findDriverPodByTag(tag: String): util.List[Pod] = {
- val podList = kubernetesClient.pods()
- .withLabel(KubernetesApplicationOperation.LABEL_KYUUBI_UNIQUE_KEY,
tag).list().getItems
- val size = podList.size()
- if (size != 1) {
- warn(s"Get Tag: ${tag} Driver Pod In Kubernetes size: ${size}, we expect
1")
+ override def stop(): Unit = {
+ Utils.tryLogNonFatalError {
+ if (enginePodInformer != null) {
+ enginePodInformer.stop()
+ enginePodInformer = null
+ }
}
- podList
- }
- override def stop(): Unit = {
- if (kubernetesClient != null) {
- try {
+ Utils.tryLogNonFatalError {
+ if (kubernetesClient != null) {
kubernetesClient.close()
- } catch {
- case e: Exception => error(e.getMessage)
+ kubernetesClient = null
}
}
+
+ if (cleanupTerminatedAppInfoTrigger != null) {
+ cleanupTerminatedAppInfoTrigger.cleanUp()
+ cleanupTerminatedAppInfoTrigger = null
+ }
+ }
+
+ private class SparkEnginePodEventHandler extends ResourceEventHandler[Pod] {
+
+ override def onAdd(pod: Pod): Unit = {
+ if (isSparkEnginePod(pod)) {
+ updateApplicationState(pod)
+ }
+ }
+
+ override def onUpdate(oldPod: Pod, newPod: Pod): Unit = {
+ if (isSparkEnginePod(newPod)) {
+ updateApplicationState(newPod)
+ val appState = toApplicationState(newPod.getStatus.getPhase)
+ if (isTerminated(appState)) {
+ markApplicationTerminated(newPod)
+ }
+ }
+ }
+
+ override def onDelete(pod: Pod, deletedFinalStateUnknown: Boolean): Unit =
{
+ if (isSparkEnginePod(pod)) {
+ updateApplicationState(pod)
+ markApplicationTerminated(pod)
+ }
+ }
+ }
+
+ private def isSparkEnginePod(pod: Pod): Boolean = {
+ val labels = pod.getMetadata.getLabels
+ labels.containsKey(LABEL_KYUUBI_UNIQUE_KEY) &&
labels.containsKey(SPARK_APP_ID_LABEL)
+ }
+
+ private def updateApplicationState(pod: Pod): Unit = {
+ val appState = toApplicationState(pod.getStatus.getPhase)
+ debug(s"Driver Informer changes pod: ${pod.getMetadata.getName} to state:
$appState")
+ appInfoStore.put(
+ pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY),
+ ApplicationInfo(
+ id = pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL),
+ name = pod.getMetadata.getName,
+ state = appState,
+ error = Option(pod.getStatus.getReason)))
+ }
+
+ private def markApplicationTerminated(pod: Pod): Unit = {
+ cleanupTerminatedAppInfoTrigger.put(
+ pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY),
+ toApplicationState(pod.getStatus.getPhase))
}
}
@@ -161,10 +219,10 @@ object KubernetesApplicationOperation extends Logging {
case "Running" => RUNNING
case "Succeeded" => FINISHED
case "Failed" | "Error" => FAILED
- case "Unknown" => ApplicationState.UNKNOWN
+ case "Unknown" => UNKNOWN
case _ =>
warn(s"The kubernetes driver pod state: $state is not supported, " +
"mark the application state as UNKNOWN.")
- ApplicationState.UNKNOWN
+ UNKNOWN
}
}