This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new d9e14f239 [KYUUBI #4623][K8S] KubernetesApplicationOperation uses 
Informer instead of list
d9e14f239 is described below

commit d9e14f239d5f27c3149519b70665fe36feaf0b3f
Author: zwangsheng <[email protected]>
AuthorDate: Fri Mar 31 15:21:59 2023 +0800

    [KYUUBI #4623][K8S] KubernetesApplicationOperation uses Informer instead of 
list
    
    ### _Why are the changes needed?_
    
    Close #4623
    
    To reduce the pressure on the Api Server (which use the kubernetes client 
polls with label to find the `spark driver pod` when multiple Rest Application 
are running at the same time), use informer, the kubernetes-recommended method 
of maintaining the application state.
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
 locally before make a pull request
    - [x] Run CI
    
    Closes #4625 from zwangsheng/KYUUBI_4623.
    
    Closes #4623
    
    a415bef7f [Cheng Pan] nit
    136d0db4d [Cheng Pan] 171
    b5d3c237a [Cheng Pan] re-generate conf
    bf14ad870 [Cheng Pan] nit
    9ee7e04f9 [Cheng Pan] nit
    301162ea0 [Cheng Pan] nit
    1d426922b [Cheng Pan] nit
    b95d7a650 [Cheng Pan] improve
    cc8d2c7f4 [zwangsheng] fix comments
    d017bafdf [zwangsheng] Set resycn 0
    28f9a70d9 [zwangsheng] Reorder func & slow get app info
    22d9c1662 [zwangsheng] fix setting
    8e0940334 [zwangsheng] fix comments
    10965d3df [zwangsheng] Rename fileter function => isSparkEnginePod
    b02677154 [zwangsheng] rename
    78c9fdb17 [zwangsheng] fix comments
    6d31f70d1 [zwangsheng] Fix IT Test
    f43bba2b9 [zwangsheng] fix
    17e4f55eb [zwangsheng] debug
    be8da790e [zwangsheng] debug
    0db45a513 [zwangsheng] retest
    a93786abc [zwangsheng] Fix style
    652ee837e [zwangsheng] Add Setting & Debug
    4add7e4e2 [zwangsheng] improve
    1f4341237 [zwangsheng] remove unused import
    35acd6106 [zwangsheng] fix compile
    05dfc598e [zwangsheng] [KYUUBI #4623][Improvement][K8S] Remove cached app 
info when out of time
    4ab530e99 [zwangsheng] [KYUUBI #4623][Improvement][K8S] 
kubernetesApplicationOperation Using Informer instead of list
    
    Lead-authored-by: zwangsheng <[email protected]>
    Co-authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 docs/deployment/settings.md                        |  25 +--
 .../test/spark/SparkOnKubernetesTestsSuite.scala   |  17 +-
 .../org/apache/kyuubi/config/KyuubiConf.scala      |  11 +-
 .../kyuubi/engine/ApplicationOperation.scala       |   5 +
 .../engine/KubernetesApplicationOperation.scala    | 190 ++++++++++++++-------
 5 files changed, 160 insertions(+), 88 deletions(-)

diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index e7d939b9b..960f2c328 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -169,7 +169,7 @@ You can configure the Kyuubi properties in 
`$KYUUBI_HOME/conf/kyuubi-defaults.co
 | kyuubi.engine.spark.python.env.archive                   | &lt;undefined&gt; 
        | Portable Python env archive used for Spark engine Python language 
mode.                                                                           
                                                                                
                                                                                
                                                                                
                  [...]
 | kyuubi.engine.spark.python.env.archive.exec.path         | bin/python        
        | The Python exec path under the Python env archive.                    
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | kyuubi.engine.spark.python.home.archive                  | &lt;undefined&gt; 
        | Spark archive containing $SPARK_HOME/python directory, which is used 
to init session Python worker for Python language mode.                         
                                                                                
                                                                                
                                                                                
               [...]
-| kyuubi.engine.submit.timeout                             | PT30S             
        | Period to tolerant Driver Pod ephemerally invisible after submitting. 
In some Resource Managers, e.g. K8s, the Driver Pod is not invisible 
immediately after `spark-submit` is returned.                                   
                                                                                
                                                                                
                         [...]
+| kyuubi.engine.submit.timeout                             | PT30S             
        | Period to tolerant Driver Pod ephemerally invisible after submitting. 
In some Resource Managers, e.g. K8s, the Driver Pod is not visible immediately 
after `spark-submit` is returned.                                               
                                                                                
                                                                                
               [...]
 | kyuubi.engine.trino.event.loggers                        | JSON              
        | A comma-separated list of engine history loggers, where 
engine/session/operation etc events go.<ul> <li>JSON: the events will be 
written to the location of kyuubi.engine.event.json.log.path</li> <li>JDBC: to 
be done</li> <li>CUSTOM: to be done.</li></ul>                                  
                                                                                
                                    [...]
 | kyuubi.engine.trino.extra.classpath                      | &lt;undefined&gt; 
        | The extra classpath for the Trino query engine, for configuring other 
libs which may need by the Trino engine                                         
                                                                                
                                                                                
                                                                                
              [...]
 | kyuubi.engine.trino.java.options                         | &lt;undefined&gt; 
        | The extra Java options for the Trino query engine                     
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
@@ -292,17 +292,18 @@ You can configure the Kyuubi properties in 
`$KYUUBI_HOME/conf/kyuubi-defaults.co
 
 ### Kubernetes
 
-|                      Key                      |      Default      |          
                                                                                
            Meaning                                                             
                                         |  Type   | Since |
-|-----------------------------------------------|-------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|-------|
-| kyuubi.kubernetes.authenticate.caCertFile     | &lt;undefined&gt; | Path to 
the CA cert file for connecting to the Kubernetes API server over TLS from the 
kyuubi. Specify this as a path as opposed to a URI (i.e. do not provide a 
scheme)                                          | string  | 1.7.0 |
-| kyuubi.kubernetes.authenticate.clientCertFile | &lt;undefined&gt; | Path to 
the client cert file for connecting to the Kubernetes API server over TLS from 
the kyuubi. Specify this as a path as opposed to a URI (i.e. do not provide a 
scheme)                                      | string  | 1.7.0 |
-| kyuubi.kubernetes.authenticate.clientKeyFile  | &lt;undefined&gt; | Path to 
the client key file for connecting to the Kubernetes API server over TLS from 
the kyuubi. Specify this as a path as opposed to a URI (i.e. do not provide a 
scheme)                                       | string  | 1.7.0 |
-| kyuubi.kubernetes.authenticate.oauthToken     | &lt;undefined&gt; | The 
OAuth token to use when authenticating against the Kubernetes API server. Note 
that unlike, the other authentication options, this must be the exact string 
value of the token to use for the authentication. | string  | 1.7.0 |
-| kyuubi.kubernetes.authenticate.oauthTokenFile | &lt;undefined&gt; | Path to 
the file containing the OAuth token to use when authenticating against the 
Kubernetes API server. Specify this as a path as opposed to a URI (i.e. do not 
provide a scheme)                               | string  | 1.7.0 |
-| kyuubi.kubernetes.context                     | &lt;undefined&gt; | The 
desired context from your kubernetes config file used to configure the K8s 
client for interacting with the cluster.                                        
                                                   | string  | 1.6.0 |
-| kyuubi.kubernetes.master.address              | &lt;undefined&gt; | The 
internal Kubernetes master (API server) address to be used for kyuubi.          
                                                                                
                                              | string  | 1.7.0 |
-| kyuubi.kubernetes.namespace                   | default           | The 
namespace that will be used for running the kyuubi pods and find engines.       
                                                                                
                                              | string  | 1.7.0 |
-| kyuubi.kubernetes.trust.certificates          | false             | If set 
to true then client can submit to kubernetes cluster only with token            
                                                                                
                                           | boolean | 1.7.0 |
+|                         Key                         |      Default      |    
                                                                                
                  Meaning                                                       
                                               |   Type   | Since |
+|-----------------------------------------------------|-------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------|
+| kyuubi.kubernetes.authenticate.caCertFile           | &lt;undefined&gt; | 
Path to the CA cert file for connecting to the Kubernetes API server over TLS 
from the kyuubi. Specify this as a path as opposed to a URI (i.e. do not 
provide a scheme)                                          | string   | 1.7.0 |
+| kyuubi.kubernetes.authenticate.clientCertFile       | &lt;undefined&gt; | 
Path to the client cert file for connecting to the Kubernetes API server over 
TLS from the kyuubi. Specify this as a path as opposed to a URI (i.e. do not 
provide a scheme)                                      | string   | 1.7.0 |
+| kyuubi.kubernetes.authenticate.clientKeyFile        | &lt;undefined&gt; | 
Path to the client key file for connecting to the Kubernetes API server over 
TLS from the kyuubi. Specify this as a path as opposed to a URI (i.e. do not 
provide a scheme)                                       | string   | 1.7.0 |
+| kyuubi.kubernetes.authenticate.oauthToken           | &lt;undefined&gt; | 
The OAuth token to use when authenticating against the Kubernetes API server. 
Note that unlike, the other authentication options, this must be the exact 
string value of the token to use for the authentication. | string   | 1.7.0 |
+| kyuubi.kubernetes.authenticate.oauthTokenFile       | &lt;undefined&gt; | 
Path to the file containing the OAuth token to use when authenticating against 
the Kubernetes API server. Specify this as a path as opposed to a URI (i.e. do 
not provide a scheme)                               | string   | 1.7.0 |
+| kyuubi.kubernetes.context                           | &lt;undefined&gt; | 
The desired context from your kubernetes config file used to configure the K8s 
client for interacting with the cluster.                                        
                                                   | string   | 1.6.0 |
+| kyuubi.kubernetes.master.address                    | &lt;undefined&gt; | 
The internal Kubernetes master (API server) address to be used for kyuubi.      
                                                                                
                                                  | string   | 1.7.0 |
+| kyuubi.kubernetes.namespace                         | default           | 
The namespace that will be used for running the kyuubi pods and find engines.   
                                                                                
                                                  | string   | 1.7.0 |
+| kyuubi.kubernetes.terminatedApplicationRetainPeriod | PT5M              | 
The period for which the Kyuubi server retains application information after 
the application terminates.                                                     
                                                     | duration | 1.7.1 |
+| kyuubi.kubernetes.trust.certificates                | false             | If 
set to true then client can submit to kubernetes cluster only with token        
                                                                                
                                               | boolean  | 1.7.0 |
 
 ### Metadata
 
diff --git 
a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
 
b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
index 831504608..9ddcc5937 100644
--- 
a/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
+++ 
b/integration-tests/kyuubi-kubernetes-it/src/test/scala/org/apache/kyuubi/kubernetes/test/spark/SparkOnKubernetesTestsSuite.scala
@@ -208,19 +208,18 @@ class KyuubiOperationKubernetesClusterClusterModeSuite
       batchRequest.getConf.asScala.toMap,
       batchRequest)
 
-    val session = 
sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSessionImpl]
-    val batchJobSubmissionOp = session.batchJobSubmissionOp
-
-    eventually(timeout(3.minutes), interval(50.milliseconds)) {
-      val appInfo = batchJobSubmissionOp.getOrFetchCurrentApplicationInfo
-      assert(appInfo.nonEmpty)
-      assert(appInfo.exists(_.state == RUNNING))
-      assert(appInfo.exists(_.name.startsWith(driverPodNamePrefix)))
+    // wait for driver pod start
+    eventually(timeout(3.minutes), interval(5.second)) {
+      // trigger k8sOperation init here
+      val appInfo = 
k8sOperation.getApplicationInfoByTag(sessionHandle.identifier.toString)
+      assert(appInfo.state == RUNNING)
+      assert(appInfo.name.startsWith(driverPodNamePrefix))
     }
 
     val killResponse = 
k8sOperation.killApplicationByTag(sessionHandle.identifier.toString)
     assert(killResponse._1)
-    assert(killResponse._2 startsWith "Operation of deleted appId:")
+    assert(killResponse._2 endsWith "is completed")
+    assert(killResponse._2 contains sessionHandle.identifier.toString)
 
     eventually(timeout(3.minutes), interval(50.milliseconds)) {
       val appInfo = 
k8sOperation.getApplicationInfoByTag(sessionHandle.identifier.toString)
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 5e7755952..b5229e2ad 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1171,6 +1171,15 @@ object KyuubiConf {
       .booleanConf
       .createWithDefault(false)
 
+  val KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD: ConfigEntry[Long] =
+    buildConf("kyuubi.kubernetes.terminatedApplicationRetainPeriod")
+      .doc("The period for which the Kyuubi server retains application 
information after " +
+        "the application terminates.")
+      .version("1.7.1")
+      .timeConf
+      .checkValue(_ > 0, "must be positive number")
+      .createWithDefault(Duration.ofMinutes(5).toMillis)
+
   // 
///////////////////////////////////////////////////////////////////////////////////////////////
   //                                 SQL Engine Configuration                  
                  //
   // 
///////////////////////////////////////////////////////////////////////////////////////////////
@@ -2553,7 +2562,7 @@ object KyuubiConf {
   val ENGINE_SUBMIT_TIMEOUT: ConfigEntry[Long] =
     buildConf("kyuubi.engine.submit.timeout")
       .doc("Period to tolerant Driver Pod ephemerally invisible after 
submitting. " +
-        "In some Resource Managers, e.g. K8s, the Driver Pod is not invisible 
immediately " +
+        "In some Resource Managers, e.g. K8s, the Driver Pod is not visible 
immediately " +
         "after `spark-submit` is returned.")
       .version("1.7.1")
       .timeConf
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
index 00db372ce..a2b3d0f76 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
@@ -100,6 +100,11 @@ case class ApplicationInfo(
   }
 }
 
+object ApplicationInfo {
+  val NOT_FOUND: ApplicationInfo = ApplicationInfo(null, null, 
ApplicationState.NOT_FOUND)
+  val UNKNOWN: ApplicationInfo = ApplicationInfo(null, null, 
ApplicationState.UNKNOWN)
+}
+
 object ApplicationOperation {
   val NOT_FOUND = "APPLICATION_NOT_FOUND"
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
index d0820b9ae..83792f52f 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
@@ -17,30 +17,54 @@
 
 package org.apache.kyuubi.engine
 
-import java.util
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
 
+import com.google.common.cache.{Cache, CacheBuilder, RemovalNotification}
 import io.fabric8.kubernetes.api.model.Pod
 import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.informers.{ResourceEventHandler, 
SharedIndexInformer}
 
-import org.apache.kyuubi.Logging
+import org.apache.kyuubi.{Logging, Utils}
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.engine.ApplicationState.{ApplicationState, FAILED, 
FINISHED, PENDING, RUNNING, UNKNOWN}
-import 
org.apache.kyuubi.engine.KubernetesApplicationOperation.{toApplicationState, 
SPARK_APP_ID_LABEL}
+import org.apache.kyuubi.engine.ApplicationState.{isTerminated, 
ApplicationState, FAILED, FINISHED, NOT_FOUND, PENDING, RUNNING, UNKNOWN}
+import 
org.apache.kyuubi.engine.KubernetesApplicationOperation.{toApplicationState, 
LABEL_KYUUBI_UNIQUE_KEY, SPARK_APP_ID_LABEL}
 import org.apache.kyuubi.util.KubernetesUtils
 
 class KubernetesApplicationOperation extends ApplicationOperation with Logging 
{
 
   @volatile
   private var kubernetesClient: KubernetesClient = _
-
+  private var enginePodInformer: SharedIndexInformer[Pod] = _
   private var submitTimeout: Long = _
 
+  // key is kyuubi_unique_key
+  private val appInfoStore: ConcurrentHashMap[String, ApplicationInfo] =
+    new ConcurrentHashMap[String, ApplicationInfo]
+  // key is kyuubi_unique_key
+  private var cleanupTerminatedAppInfoTrigger: Cache[String, ApplicationState] 
= _
+
   override def initialize(conf: KyuubiConf): Unit = {
     info("Start initializing Kubernetes Client.")
     kubernetesClient = KubernetesUtils.buildKubernetesClient(conf) match {
       case Some(client) =>
         info(s"Initialized Kubernetes Client connect to: 
${client.getMasterUrl}")
         submitTimeout = conf.get(KyuubiConf.ENGINE_SUBMIT_TIMEOUT)
+        // Disable resync, see 
https://github.com/fabric8io/kubernetes-client/discussions/5015
+        enginePodInformer = client.pods()
+          .withLabel(LABEL_KYUUBI_UNIQUE_KEY)
+          .inform(new SparkEnginePodEventHandler)
+        info("Start Kubernetes Client Informer.")
+        // Defer cleaning terminated application information
+        val retainPeriod = 
conf.get(KyuubiConf.KUBERNETES_TERMINATED_APPLICATION_RETAIN_PERIOD)
+        cleanupTerminatedAppInfoTrigger = CacheBuilder.newBuilder()
+          .expireAfterWrite(retainPeriod, TimeUnit.MILLISECONDS)
+          .removalListener((notification: RemovalNotification[String, 
ApplicationState]) => {
+            Option(appInfoStore.remove(notification.getKey)).foreach { removed 
=>
+              info(s"Remove terminated application ${removed.id} with " +
+                s"tag ${notification.getKey} and state ${removed.state}")
+            }
+          })
+          .build()
         client
       case None =>
         warn("Fail to init Kubernetes Client for Kubernetes Application 
Operation")
@@ -55,34 +79,26 @@ class KubernetesApplicationOperation extends 
ApplicationOperation with Logging {
   }
 
   override def killApplicationByTag(tag: String): KillResponse = {
-    if (kubernetesClient != null) {
-      debug(s"Deleting application info from Kubernetes cluster by $tag tag")
-      try {
-        // Need driver only
-        val podList = findDriverPodByTag(tag)
-        if (podList.size() != 0) {
-          val targetPod = podList.get(0)
-          toApplicationState(targetPod.getStatus.getPhase) match {
-            case FAILED | UNKNOWN =>
-              (
-                false,
-                s"Target Pod ${targetPod.getMetadata.getName} is in FAILED or 
UNKNOWN status")
-            case _ =>
-              (
-                
!kubernetesClient.pods.withName(targetPod.getMetadata.getName).delete().isEmpty,
-                s"Operation of deleted appId: 
${podList.get(0).getMetadata.getName} is completed")
-          }
-        } else {
+    if (kubernetesClient == null) {
+      throw new IllegalStateException("Methods initialize and isSupported must 
be called ahead")
+    }
+    debug(s"Deleting application info from Kubernetes cluster by $tag tag")
+    try {
+      val info = appInfoStore.getOrDefault(tag, ApplicationInfo.NOT_FOUND)
+      debug(s"Application info[tag: $tag] is in ${info.state}")
+      info.state match {
+        case NOT_FOUND | FAILED | UNKNOWN =>
           (
             false,
-            s"Target Pod(tag: $tag) is not found, due to pod have been deleted 
or not created")
-        }
-      } catch {
-        case e: Exception =>
-          (false, s"Failed to terminate application with $tag, due to 
${e.getMessage}")
+            s"Target application[tag: $tag] is in ${info.state} status")
+        case _ =>
+          (
+            !kubernetesClient.pods.withName(info.name).delete().isEmpty,
+            s"Operation of deleted application[appId: ${info.id} ,tag: $tag] 
is completed")
       }
-    } else {
-      throw new IllegalStateException("Methods initialize and isSupported must 
be called ahead")
+    } catch {
+      case e: Exception =>
+        (false, s"Failed to terminate application with $tag, due to 
${e.getMessage}")
     }
   }
 
@@ -92,59 +108,101 @@ class KubernetesApplicationOperation extends 
ApplicationOperation with Logging {
     }
     debug(s"Getting application info from Kubernetes cluster by $tag tag")
     try {
-      val podList = findDriverPodByTag(tag)
-      if (podList.size() != 0) {
-        val pod = podList.get(0)
-        val info = ApplicationInfo(
-          // spark pods always tag label `spark-app-selector:<spark-app-id>`
-          id = pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL),
-          name = pod.getMetadata.getName,
-          state = 
KubernetesApplicationOperation.toApplicationState(pod.getStatus.getPhase),
-          error = Option(pod.getStatus.getReason))
-        debug(s"Successfully got application info by $tag: $info")
-        return info
-      }
-      // Kyuubi should wait second if pod is not be created
-      submitTime match {
-        case Some(time) =>
-          val elapsedTime = System.currentTimeMillis() - time
+      val appInfo = appInfoStore.getOrDefault(tag, ApplicationInfo.NOT_FOUND)
+      (appInfo.state, submitTime) match {
+        // Kyuubi should wait second if pod is not be created
+        case (NOT_FOUND, Some(_submitTime)) =>
+          val elapsedTime = System.currentTimeMillis - _submitTime
           if (elapsedTime > submitTimeout) {
             error(s"Can't find target driver pod by tag: $tag, " +
               s"elapsed time: ${elapsedTime}ms exceeds ${submitTimeout}ms.")
-            ApplicationInfo(id = null, name = null, ApplicationState.NOT_FOUND)
+            ApplicationInfo.NOT_FOUND
           } else {
             warn("Wait for driver pod to be created, " +
               s"elapsed time: ${elapsedTime}ms, return UNKNOWN status")
-            ApplicationInfo(id = null, name = null, ApplicationState.UNKNOWN)
+            ApplicationInfo.UNKNOWN
           }
-        case None =>
-          ApplicationInfo(id = null, name = null, ApplicationState.NOT_FOUND)
+        case (NOT_FOUND, None) =>
+          ApplicationInfo.NOT_FOUND
+        case _ =>
+          debug(s"Successfully got application info by $tag: $appInfo")
+          appInfo
       }
     } catch {
       case e: Exception =>
         error(s"Failed to get application with $tag, due to ${e.getMessage}")
-        ApplicationInfo(id = null, name = null, ApplicationState.NOT_FOUND)
+        ApplicationInfo.NOT_FOUND
     }
   }
 
-  private def findDriverPodByTag(tag: String): util.List[Pod] = {
-    val podList = kubernetesClient.pods()
-      .withLabel(KubernetesApplicationOperation.LABEL_KYUUBI_UNIQUE_KEY, 
tag).list().getItems
-    val size = podList.size()
-    if (size != 1) {
-      warn(s"Get Tag: ${tag} Driver Pod In Kubernetes size: ${size}, we expect 
1")
+  override def stop(): Unit = {
+    Utils.tryLogNonFatalError {
+      if (enginePodInformer != null) {
+        enginePodInformer.stop()
+        enginePodInformer = null
+      }
     }
-    podList
-  }
 
-  override def stop(): Unit = {
-    if (kubernetesClient != null) {
-      try {
+    Utils.tryLogNonFatalError {
+      if (kubernetesClient != null) {
         kubernetesClient.close()
-      } catch {
-        case e: Exception => error(e.getMessage)
+        kubernetesClient = null
       }
     }
+
+    if (cleanupTerminatedAppInfoTrigger != null) {
+      cleanupTerminatedAppInfoTrigger.cleanUp()
+      cleanupTerminatedAppInfoTrigger = null
+    }
+  }
+
+  private class SparkEnginePodEventHandler extends ResourceEventHandler[Pod] {
+
+    override def onAdd(pod: Pod): Unit = {
+      if (isSparkEnginePod(pod)) {
+        updateApplicationState(pod)
+      }
+    }
+
+    override def onUpdate(oldPod: Pod, newPod: Pod): Unit = {
+      if (isSparkEnginePod(newPod)) {
+        updateApplicationState(newPod)
+        val appState = toApplicationState(newPod.getStatus.getPhase)
+        if (isTerminated(appState)) {
+          markApplicationTerminated(newPod)
+        }
+      }
+    }
+
+    override def onDelete(pod: Pod, deletedFinalStateUnknown: Boolean): Unit = 
{
+      if (isSparkEnginePod(pod)) {
+        updateApplicationState(pod)
+        markApplicationTerminated(pod)
+      }
+    }
+  }
+
+  private def isSparkEnginePod(pod: Pod): Boolean = {
+    val labels = pod.getMetadata.getLabels
+    labels.containsKey(LABEL_KYUUBI_UNIQUE_KEY) && 
labels.containsKey(SPARK_APP_ID_LABEL)
+  }
+
+  private def updateApplicationState(pod: Pod): Unit = {
+    val appState = toApplicationState(pod.getStatus.getPhase)
+    debug(s"Driver Informer changes pod: ${pod.getMetadata.getName} to state: 
$appState")
+    appInfoStore.put(
+      pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY),
+      ApplicationInfo(
+        id = pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL),
+        name = pod.getMetadata.getName,
+        state = appState,
+        error = Option(pod.getStatus.getReason)))
+  }
+
+  private def markApplicationTerminated(pod: Pod): Unit = {
+    cleanupTerminatedAppInfoTrigger.put(
+      pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY),
+      toApplicationState(pod.getStatus.getPhase))
   }
 }
 
@@ -161,10 +219,10 @@ object KubernetesApplicationOperation extends Logging {
     case "Running" => RUNNING
     case "Succeeded" => FINISHED
     case "Failed" | "Error" => FAILED
-    case "Unknown" => ApplicationState.UNKNOWN
+    case "Unknown" => UNKNOWN
     case _ =>
       warn(s"The kubernetes driver pod state: $state is not supported, " +
         "mark the application state as UNKNOWN.")
-      ApplicationState.UNKNOWN
+      UNKNOWN
   }
 }

Reply via email to