This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new be6133a  [KYUUBI #1816] Implement KyuubiHistoryServerPlugin
be6133a is described below

commit be6133ae99c3c702587e9eaf1ca63fea99aa72f9
Author: Wang Zhen <[email protected]>
AuthorDate: Thu Feb 10 09:38:13 2022 +0800

    [KYUUBI #1816] Implement KyuubiHistoryServerPlugin
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: 
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in 
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your 
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., 
'[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    Implement KyuubiHistoryServerPlugin. #1816
    
    ### _How was this patch tested?_
    - [X] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [X] Add screenshots for manual tests if appropriate
    **SparkUI:**
    
![sparkui001](https://user-images.githubusercontent.com/17894939/151120910-82027528-4850-433b-89a7-8eee653b79d8.png)
    
![sparkui002](https://user-images.githubusercontent.com/17894939/151120917-60887aec-a905-4a3e-a346-cb646d5ed0d1.png)
    
    **SparkHistoryServer:**
    
![history001](https://user-images.githubusercontent.com/17894939/151120931-fac35425-e9df-409d-8de8-5cba513d7e45.png)
    
![history002](https://user-images.githubusercontent.com/17894939/151120995-06ec7366-38bc-45ff-ade8-d7201c606a77.png)
    
    - [X] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #1820 from wForget/KYUUBI-1816.
    
    Closes #1816
    
    a3256920 [Wang Zhen] fix test
    256363d8 [Wang Zhen] fix
    0aecc305 [Wang Zhen] load kyuubiConf from sparkConf
    86608fa5 [wForget] fix tests
    2481721b [wForget] fix
    d4f5825d [wForget] Merge remote-tracking branch 'origin/master' into 
KYUUBI-1816
    18c18458 [wForget] fix checkstyle
    c21f8b10 [wForget] [KYUUBI-1816] add AppHistoryServerPlugin services
    0adf3634 [Wang Zhen] [KYUUBI-1816] Add KyuubiHistoryServerPlugin
    8f5b1196 [Wang Zhen] [KYUUBI-1816] Refactor EngineEventsStore to get Events 
through ElementTrackingStore.
    678396d1 [Wang Zhen] [KYUUBI-1816] Add SparkSQLEngineEventListener
    fcc1a925 [Wang Zhen] [KYUUBI-1816] Add the @KVIndex property to the event, 
which will be used for KvStore
    
    Lead-authored-by: Wang Zhen <[email protected]>
    Co-authored-by: wForget <[email protected]>
    Signed-off-by: ulysses-you <[email protected]>
---
 .rat-excludes                                      |   1 +
 .../org.apache.spark.status.AppHistoryServerPlugin |   1 +
 .../kyuubi/engine/spark/KyuubiSparkUtil.scala      |   5 +
 .../kyuubi/engine/spark/SparkSQLEngine.scala       |  21 +-
 .../engine/spark/events/EngineEventsStore.scala    | 101 ++-------
 .../kyuubi/engine/spark/events/SessionEvent.scala  |   8 +-
 .../engine/spark/events/SparkOperationEvent.scala  |   8 +-
 .../apache/spark/kyuubi/SparkContextHelper.scala   |  10 +
 .../spark/kyuubi/SparkSQLEngineEventListener.scala |  98 +++++++++
 .../spark/kyuubi/SparkSQLEngineListener.scala      |  20 +-
 .../scala/org/apache/spark/ui/EnginePage.scala     |  42 ++--
 .../org/apache/spark/ui/EngineSessionPage.scala    |  43 ++--
 .../main/scala/org/apache/spark/ui/EngineTab.scala |  31 ++-
 .../spark/ui/KyuubiHistoryServerPlugin.scala       |  64 ++++++
 .../spark/events/EngineEventsStoreSuite.scala      | 243 +++++----------------
 .../scala/org/apache/spark/ui/EngineTabSuite.scala |  14 ++
 16 files changed, 369 insertions(+), 341 deletions(-)

diff --git a/.rat-excludes b/.rat-excludes
index 709e852..5194ffe 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -39,3 +39,4 @@ build/scala-*/**
 **/*.output.schema
 **/apache-kyuubi-*-bin*/**
 **/benchmarks/**
+**/org.apache.spark.status.AppHistoryServerPlugin
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin
 
b/externals/kyuubi-spark-sql-engine/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin
new file mode 100644
index 0000000..2be4ed7
--- /dev/null
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin
@@ -0,0 +1 @@
+org.apache.spark.ui.KyuubiHistoryServerPlugin
\ No newline at end of file
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala
index 51752d0..3a708e0 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala
@@ -19,13 +19,18 @@ package org.apache.kyuubi.engine.spark
 
 import java.time.{Instant, LocalDateTime, ZoneId}
 
+import scala.annotation.meta.getter
+
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.kvstore.KVIndex
 
 import org.apache.kyuubi.Utils
 
 object KyuubiSparkUtil {
 
+  type KVIndexParam = KVIndex @getter
+
   final val SPARK_SCHEDULER_POOL_KEY = "spark.scheduler.pool"
   final val SPARK_SQL_EXECUTION_ID_KEY = "spark.sql.execution.id"
 
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index e2cf3ad..f884fec 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.CountDownLatch
 import scala.util.control.NonFatal
 
 import org.apache.spark.{ui, SparkConf}
+import org.apache.spark.kyuubi.{SparkContextHelper, 
SparkSQLEngineEventListener, SparkSQLEngineListener}
 import org.apache.spark.kyuubi.SparkSQLEngineListener
 import org.apache.spark.kyuubi.SparkUtilsHelper.getLocalDir
 import org.apache.spark.sql.SparkSession
@@ -38,16 +39,17 @@ import org.apache.kyuubi.ha.client.RetryPolicies
 import org.apache.kyuubi.service.Serverable
 import org.apache.kyuubi.util.SignalRegister
 
-case class SparkSQLEngine(
-    spark: SparkSession,
-    store: EngineEventsStore) extends Serverable("SparkSQLEngine") {
+case class SparkSQLEngine(spark: SparkSession) extends 
Serverable("SparkSQLEngine") {
 
   override val backendService = new SparkSQLBackendService(spark)
   override val frontendServices = Seq(new SparkTBinaryFrontendService(this))
 
   override def initialize(conf: KyuubiConf): Unit = {
-    val listener = new SparkSQLEngineListener(this, store)
+    val listener = new SparkSQLEngineListener(this)
     spark.sparkContext.addSparkListener(listener)
+    val kvStore = SparkContextHelper.getKvStore(spark.sparkContext)
+    val engineEventListener = new SparkSQLEngineEventListener(kvStore, conf)
+    spark.sparkContext.addSparkListener(engineEventListener)
     super.initialize(conf)
   }
 
@@ -122,8 +124,7 @@ object SparkSQLEngine extends Logging {
   }
 
   def startEngine(spark: SparkSession): Unit = {
-    val store = new EngineEventsStore(kyuubiConf)
-    currentEngine = Some(new SparkSQLEngine(spark, store))
+    currentEngine = Some(new SparkSQLEngine(spark))
     currentEngine.foreach { engine =>
       // start event logging ahead so that we can capture all statuses
       val eventLogging = new EventLoggingService(spark.sparkContext)
@@ -145,7 +146,13 @@ object SparkSQLEngine extends Logging {
       }
       try {
         engine.start()
-        ui.EngineTab(engine)
+        val kvStore = SparkContextHelper.getKvStore(spark.sparkContext)
+        val store = new EngineEventsStore(kvStore)
+        ui.EngineTab(
+          Some(engine),
+          SparkContextHelper.getSparkUI(spark.sparkContext),
+          store,
+          kyuubiConf)
         val event = EngineEvent(engine)
         info(event)
         EventLoggingService.onEvent(event)
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala
index d05cd8e..d69f0e2 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala
@@ -17,12 +17,9 @@
 
 package org.apache.kyuubi.engine.spark.events
 
-import java.util.concurrent.ConcurrentHashMap
+import scala.collection.JavaConverters._
 
-import scala.collection.JavaConverters.collectionAsScalaIterableConverter
-
-import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.{ENGINE_UI_SESSION_LIMIT, 
ENGINE_UI_STATEMENT_LIMIT}
+import org.apache.spark.util.kvstore.KVStore
 
 /**
  * A memory store that tracking the number of statements and sessions, it 
provides:
@@ -32,106 +29,44 @@ import 
org.apache.kyuubi.config.KyuubiConf.{ENGINE_UI_SESSION_LIMIT, ENGINE_UI_S
  * 1). remove the finished events first.
  * 2). remove the active events if still reach the threshold.
  */
-class EngineEventsStore(conf: KyuubiConf) {
-
-  /**
-   * The number of SQL client sessions kept in the Kyuubi Query Engine web UI.
-   */
-  private val retainedSessions: Int = conf.get(ENGINE_UI_SESSION_LIMIT)
-
-  /**
-   * The number of statements kept in the Kyuubi Query Engine web UI.
-   */
-  private val retainedStatements: Int = conf.get(ENGINE_UI_STATEMENT_LIMIT)
-
-  /**
-   * store all session events.
-   */
-  val sessions = new ConcurrentHashMap[String, SessionEvent]
+class EngineEventsStore(store: KVStore) {
 
   /**
    * get all session events order by startTime
    */
   def getSessionList: Seq[SessionEvent] = {
-    sessions.values().asScala.toSeq.sortBy(_.startTime)
+    store.view(classOf[SessionEvent]).asScala.toSeq
   }
 
   def getSession(sessionId: String): Option[SessionEvent] = {
-    Option(sessions.get(sessionId))
-  }
-
-  /**
-   * save session events and check the capacity threshold
-   */
-  def saveSession(sessionEvent: SessionEvent): Unit = {
-    sessions.put(sessionEvent.sessionId, sessionEvent)
-    checkSessionCapacity()
-  }
-
-  /**
-   * cleanup the session events if reach the threshold
-   */
-  private def checkSessionCapacity(): Unit = {
-    var countToDelete = sessions.size - retainedSessions
-
-    val reverseSeq = 
sessions.values().asScala.toSeq.sortBy(_.startTime).reverse
-
-    // remove finished sessions first.
-    for (event <- reverseSeq if event.endTime != -1L && countToDelete > 0) {
-      sessions.remove(event.sessionId)
-      countToDelete -= 1
-    }
-
-    // remove active event if still reach the threshold
-    for (event <- reverseSeq if countToDelete > 0) {
-      sessions.remove(event.sessionId)
-      countToDelete -= 1
+    try {
+      Some(store.read(classOf[SessionEvent], sessionId))
+    } catch {
+      case _: NoSuchElementException => None
     }
   }
 
   /**
-   * store all statements events.
-   */
-  val statements = new ConcurrentHashMap[String, SparkOperationEvent]
-
-  /**
    * get all statement events order by startTime
    */
   def getStatementList: Seq[SparkOperationEvent] = {
-    statements.values().asScala.toSeq.sortBy(_.createTime)
+    store.view(classOf[SparkOperationEvent]).asScala.toSeq
   }
 
   def getStatement(statementId: String): Option[SparkOperationEvent] = {
-    Option(statements.get(statementId))
+    try {
+      Some(store.read(classOf[SparkOperationEvent], statementId))
+    } catch {
+      case _: NoSuchElementException => None
+    }
   }
 
-  /**
-   * save statement events and check the capacity threshold
-   */
-  def saveStatement(statementEvent: SparkOperationEvent): Unit = {
-    statements.put(statementEvent.statementId, statementEvent)
-    checkStatementCapacity()
+  def getSessionCount: Long = {
+    store.count(classOf[SessionEvent])
   }
 
-  /**
-   * cleanup the statement events if reach the threshold
-   */
-  private def checkStatementCapacity(): Unit = {
-    var countToDelete = statements.size - retainedStatements
-
-    val reverseSeq = 
statements.values().asScala.toSeq.sortBy(_.createTime).reverse
-
-    //  remove finished statements first.
-    for (event <- reverseSeq if event.completeTime != -1L && countToDelete > 
0) {
-      statements.remove(event.statementId)
-      countToDelete -= 1
-    }
-
-    // remove active event if still reach the threshold
-    for (event <- reverseSeq if countToDelete > 0) {
-      statements.remove(event.statementId)
-      countToDelete -= 1
-    }
+  def getStatementCount: Long = {
+    store.count(classOf[SparkOperationEvent])
   }
 
 }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala
index ce21c39..670d085 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala
@@ -17,11 +17,14 @@
 
 package org.apache.kyuubi.engine.spark.events
 
+import com.fasterxml.jackson.annotation.JsonIgnore
 import org.apache.spark.sql.Encoders
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.kvstore.KVIndex
 
 import org.apache.kyuubi.Utils
 import org.apache.kyuubi.engine.spark.KyuubiSparkUtil
+import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.KVIndexParam
 import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
 
 /**
@@ -35,7 +38,7 @@ import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
  * @param totalOperations how many queries and meta calls
  */
 case class SessionEvent(
-    sessionId: String,
+    @KVIndexParam sessionId: String,
     engineId: String,
     username: String,
     ip: String,
@@ -55,6 +58,9 @@ case class SessionEvent(
       endTime - startTime
     }
   }
+
+  @JsonIgnore @KVIndex("endTime")
+  private def endTimeIndex: Long = if (endTime > 0L) endTime else -1L
 }
 
 object SessionEvent {
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala
index 87fbc87..d496989 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala
@@ -17,10 +17,13 @@
 
 package org.apache.kyuubi.engine.spark.events
 
+import com.fasterxml.jackson.annotation.JsonIgnore
 import org.apache.spark.sql.Encoders
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.kvstore.KVIndex
 
 import org.apache.kyuubi.Utils
+import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.KVIndexParam
 import org.apache.kyuubi.engine.spark.operation.SparkOperation
 
 /**
@@ -45,7 +48,7 @@ import org.apache.kyuubi.engine.spark.operation.SparkOperation
  * @param executionId the query execution id of this operation
  */
 case class SparkOperationEvent(
-    statementId: String,
+    @KVIndexParam statementId: String,
     statement: String,
     shouldRunAsync: Boolean,
     state: String,
@@ -69,6 +72,9 @@ case class SparkOperationEvent(
       completeTime - createTime
     }
   }
+
+  @JsonIgnore @KVIndex("completeTime")
+  private def completeTimeIndex: Long = if (completeTime > 0L) completeTime 
else -1L
 }
 
 object SparkOperationEvent {
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala
index 87e74db..ba67151 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala
@@ -24,6 +24,8 @@ import org.apache.spark.scheduler.SchedulerBackend
 import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.scheduler.local.LocalSchedulerBackend
+import org.apache.spark.status.ElementTrackingStore
+import org.apache.spark.ui.SparkUI
 
 import org.apache.kyuubi.Logging
 import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_STATEMENT_ID_KEY
@@ -40,6 +42,14 @@ object SparkContextHelper extends Logging {
     new SparkHistoryEventLogger(sc)
   }
 
+  def getKvStore(sc: SparkContext): ElementTrackingStore = {
+    sc.statusStore.store.asInstanceOf[ElementTrackingStore]
+  }
+
+  def getSparkUI(sc: SparkContext): Option[SparkUI] = {
+    sc.ui
+  }
+
   def updateDelegationTokens(sc: SparkContext, creds: Credentials): Unit = {
     val bytes = SparkHadoopUtil.get.serialize(creds)
     sc.schedulerBackend match {
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineEventListener.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineEventListener.scala
new file mode 100644
index 0000000..52e50b5
--- /dev/null
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineEventListener.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kyuubi
+
+import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
+import org.apache.spark.status.{ElementTrackingStore, KVUtils}
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_UI_SESSION_LIMIT, 
ENGINE_UI_STATEMENT_LIMIT}
+import org.apache.kyuubi.engine.spark.events.{SessionEvent, 
SparkOperationEvent}
+
+class SparkSQLEngineEventListener(
+    kvstore: ElementTrackingStore,
+    kyuubiConf: KyuubiConf) extends SparkListener {
+
+  /**
+   * The number of SQL client sessions kept in the Kyuubi Query Engine web UI.
+   */
+  private val retainedSessions: Int = kyuubiConf.get(ENGINE_UI_SESSION_LIMIT)
+
+  /**
+   * The number of statements kept in the Kyuubi Query Engine web UI.
+   */
+  private val retainedStatements: Int = 
kyuubiConf.get(ENGINE_UI_STATEMENT_LIMIT)
+
+  kvstore.addTrigger(classOf[SessionEvent], retainedSessions) { count =>
+    cleanupSession(count)
+  }
+
+  kvstore.addTrigger(classOf[SparkOperationEvent], retainedStatements) { count 
=>
+    cleanupOperation(count)
+  }
+
+  override def onOtherEvent(event: SparkListenerEvent): Unit = {
+    event match {
+      case e: SessionEvent => updateSessionStore(e)
+      case e: SparkOperationEvent => updateStatementStore(e)
+      case _ => // Ignore
+    }
+  }
+
+  private def updateSessionStore(event: SessionEvent): Unit = {
+    kvstore.write(event, event.endTime == -1L)
+  }
+
+  private def updateStatementStore(event: SparkOperationEvent): Unit = {
+    kvstore.write(event, true)
+  }
+
+  private def cleanupSession(count: Long): Unit = {
+    val countToDelete = calculateNumberToRemove(count, retainedSessions)
+    if (countToDelete <= 0L) {
+      return
+    }
+    val view = kvstore.view(classOf[SessionEvent]).index("endTime").first(0L)
+    val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { j =>
+      j.endTime != 0L
+    }
+
+    toDelete.foreach { j => kvstore.delete(j.getClass, j.sessionId) }
+  }
+
+  private def cleanupOperation(count: Long): Unit = {
+    val countToDelete = calculateNumberToRemove(count, retainedStatements)
+    if (countToDelete <= 0L) {
+      return
+    }
+    val view = 
kvstore.view(classOf[SparkOperationEvent]).index("completeTime").first(0L)
+    val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { j =>
+      j.completeTime != 0
+    }
+    toDelete.foreach { j => kvstore.delete(j.getClass, j.statementId) }
+  }
+
+  private def calculateNumberToRemove(dataSize: Long, retainedSize: Long): 
Long = {
+    if (dataSize > retainedSize) {
+      math.max(retainedSize / 10L, dataSize - retainedSize)
+    } else {
+      0L
+    }
+  }
+
+}
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala
index 77414db..8e32b53 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala
@@ -30,7 +30,6 @@ import org.apache.kyuubi.Logging
 import org.apache.kyuubi.Utils.stringifyException
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_STATEMENT_ID_KEY
-import org.apache.kyuubi.engine.spark.events.{EngineEventsStore, SessionEvent, 
SparkOperationEvent}
 import org.apache.kyuubi.service.{Serverable, ServiceState}
 
 /**
@@ -38,9 +37,7 @@ import org.apache.kyuubi.service.{Serverable, ServiceState}
  *
  * @param server the corresponding engine
  */
-class SparkSQLEngineListener(
-    server: Serverable,
-    store: EngineEventsStore) extends SparkListener with Logging {
+class SparkSQLEngineListener(server: Serverable) extends SparkListener with 
Logging {
 
   // the conf of server is null before initialized, use lazy val here
   private lazy val deregisterExceptions: Seq[String] =
@@ -116,19 +113,4 @@ class SparkSQLEngineListener(
     case e => e
   }
 
-  override def onOtherEvent(event: SparkListenerEvent): Unit = {
-    event match {
-      case e: SessionEvent => updateSessionStore(e)
-      case e: SparkOperationEvent => updateStatementStore(e)
-      case _ => // Ignore
-    }
-  }
-
-  private def updateSessionStore(event: SessionEvent): Unit = {
-    store.saveSession(event)
-  }
-
-  private def updateStatementStore(event: SparkOperationEvent): Unit = {
-    store.saveStatement(event)
-  }
 }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala
index b18888f..d7b578c 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala
@@ -33,7 +33,7 @@ import org.apache.kyuubi.{KYUUBI_VERSION, Utils}
 import org.apache.kyuubi.engine.spark.events.{SessionEvent, 
SparkOperationEvent}
 
 case class EnginePage(parent: EngineTab) extends WebUIPage("") {
-  private val store = parent.engine.store
+  private val store = parent.store
 
   override def render(request: HttpServletRequest): Seq[Node] = {
     val content =
@@ -42,8 +42,8 @@ case class EnginePage(parent: EngineTab) extends 
WebUIPage("") {
         stop(request) ++
         <br/> ++
         <h4>
-        {parent.engine.backendService.sessionManager.getOpenSessionCount} 
session(s) are online,
-        running 
{parent.engine.backendService.sessionManager.operationManager.getOperationCount}
+        {store.getSessionCount} session(s) are online,
+        running {store.getStatementCount}
         operations
       </h4> ++
         generateSessionStatsTable(request) ++
@@ -52,7 +52,7 @@ case class EnginePage(parent: EngineTab) extends 
WebUIPage("") {
   }
 
   private def generateBasicStats(): Seq[Node] = {
-    val timeSinceStart = System.currentTimeMillis() - 
parent.engine.getStartTime
+    val timeSinceStart = parent.endTime() - parent.startTime
     <ul class ="list-unstyled">
       <li>
         <strong>Kyuubi Version: </strong>
@@ -60,24 +60,32 @@ case class EnginePage(parent: EngineTab) extends 
WebUIPage("") {
       </li>
       <li>
         <strong>Started at: </strong>
-        {new Date(parent.engine.getStartTime)}
-      </li>
-      <li>
-        <strong>Latest Logout at: </strong>
-        {new 
Date(parent.engine.backendService.sessionManager.latestLogoutTime)}
+        {new Date(parent.startTime)}
       </li>
+    {
+      parent.engine.map { engine =>
+        <li>
+            <strong>Latest Logout at: </strong>
+            {new Date(engine.backendService.sessionManager.latestLogoutTime)}
+          </li>
+      }.getOrElse(Seq.empty)
+    }
       <li>
         <strong>Time since start: </strong>
         {formatDurationVerbose(timeSinceStart)}
       </li>
-      <li>
-        <strong>Background execution pool threads alive: </strong>
-        {parent.engine.backendService.sessionManager.getExecPoolSize}
-      </li>
-      <li>
-        <strong>Background execution pool threads active: </strong>
-        {parent.engine.backendService.sessionManager.getActiveCount}
-      </li>
+    {
+      parent.engine.map { engine =>
+        <li>
+          <strong>Background execution pool threads alive: </strong>
+          {engine.backendService.sessionManager.getExecPoolSize}
+        </li>
+        <li>
+          <strong>Background execution pool threads active: </strong>
+          {engine.backendService.sessionManager.getActiveCount}
+        </li>
+      }.getOrElse(Seq.empty)
+    }
     </ul>
   }
 
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala
index 5d21459..18c12e3 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala
@@ -29,7 +29,7 @@ import org.apache.spark.util.Utils
 /** Page for Spark Web UI that shows statistics of jobs running in the engine 
server */
 case class EngineSessionPage(parent: EngineTab)
   extends WebUIPage("session") with Logging {
-  val store = parent.engine.store
+  val store = parent.store
 
   /** Render the page */
   def render(request: HttpServletRequest): Seq[Node] = {
@@ -55,23 +55,30 @@ case class EngineSessionPage(parent: EngineTab)
   }
 
   /** Generate basic stats of the engine server */
-  private def generateBasicStats(): Seq[Node] = {
-    val timeSinceStart = System.currentTimeMillis() - 
parent.engine.getStartTime
-    <ul class ="list-unstyled">
-      <li>
-        <strong>Started at: </strong>
-        {new Date(parent.engine.getStartTime)}
-      </li>
-      <li>
-        <strong>Latest Logout at: </strong>
-        {new 
Date(parent.engine.backendService.sessionManager.latestLogoutTime)}
-      </li>
-      <li>
-        <strong>Time since start: </strong>
-        {formatDurationVerbose(timeSinceStart)}
-      </li>
-    </ul>
-  }
+  private def generateBasicStats(): Seq[Node] =
+    if (parent.engine.isDefined) {
+      val timeSinceStart = parent.endTime() - parent.startTime
+      <ul class ="list-unstyled">
+        <li>
+          <strong>Started at: </strong>
+          {new Date(parent.startTime)}
+        </li>
+      {
+        parent.engine.map { engine =>
+          <li>
+              <strong>Latest Logout at: </strong>
+              {new Date(engine.backendService.sessionManager.latestLogoutTime)}
+            </li>
+        }.getOrElse(Seq.empty)
+      }
+        <li>
+          <strong>Time since start: </strong>
+          {formatDurationVerbose(timeSinceStart)}
+        </li>
+      </ul>
+    } else {
+      Seq.empty
+    }
 
   /** Generate stats of batch statements of the engine server */
   private def generateSQLStatsTable(request: HttpServletRequest, sessionID: 
String): Seq[Node] = {
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineTab.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineTab.scala
index 0ebb5d1..e0c6e1a 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineTab.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineTab.scala
@@ -24,25 +24,42 @@ import scala.util.control.NonFatal
 import org.apache.kyuubi.{Logging, Utils}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.engine.spark.SparkSQLEngine
+import org.apache.kyuubi.engine.spark.events.EngineEventsStore
 import org.apache.kyuubi.service.ServiceState
 
 /**
  * Note that [[SparkUITab]] is private for Spark
  */
-case class EngineTab(engine: SparkSQLEngine)
-  extends SparkUITab(engine.spark.sparkContext.ui.orNull, "kyuubi") with 
Logging {
+case class EngineTab(
+    engine: Option[SparkSQLEngine],
+    sparkUI: Option[SparkUI],
+    store: EngineEventsStore,
+    kyuubiConf: KyuubiConf)
+  extends SparkUITab(sparkUI.orNull, "kyuubi") with Logging {
 
   override val name: String = "Kyuubi Query Engine"
-  val killEnabled = engine.getConf.get(KyuubiConf.ENGINE_UI_STOP_ENABLED)
+  val killEnabled = kyuubiConf.get(KyuubiConf.ENGINE_UI_STOP_ENABLED)
 
-  engine.spark.sparkContext.ui.foreach { ui =>
+  val startTime = engine.map(_.getStartTime).getOrElse {
+    sparkUI
+      .map(ui => ui.store.applicationInfo().attempts.head.startTime.getTime)
+      .getOrElse(0L)
+  }
+
+  def endTime(): Long = engine.map(_ => System.currentTimeMillis()).getOrElse {
+    sparkUI
+      .map(ui => ui.store.applicationInfo().attempts.head.endTime.getTime)
+      .getOrElse(0L)
+  }
+
+  sparkUI.foreach { ui =>
     this.attachPage(EnginePage(this))
     this.attachPage(EngineSessionPage(this))
     ui.attachTab(this)
     Utils.addShutdownHook(() => ui.detachTab(this))
   }
 
-  engine.spark.sparkContext.ui.foreach { ui =>
+  sparkUI.foreach { ui =>
     try {
       // Spark shade the jetty package so here we use reflect
       Class.forName("org.apache.spark.ui.SparkUI")
@@ -68,8 +85,8 @@ case class EngineTab(engine: SparkSQLEngine)
   }
 
   def handleKillRequest(request: HttpServletRequest): Unit = {
-    if (killEnabled && engine != null && engine.getServiceState != 
ServiceState.STOPPED) {
-      engine.stop()
+    if (killEnabled && engine.isDefined && engine.get.getServiceState != 
ServiceState.STOPPED) {
+      engine.get.stop()
     }
   }
 }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/KyuubiHistoryServerPlugin.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/KyuubiHistoryServerPlugin.scala
new file mode 100644
index 0000000..f2eb96d
--- /dev/null
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/KyuubiHistoryServerPlugin.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui
+
+import org.apache.spark.SparkConf
+import org.apache.spark.kyuubi.SparkSQLEngineEventListener
+import org.apache.spark.scheduler.SparkListener
+import org.apache.spark.status.{AppHistoryServerPlugin, ElementTrackingStore}
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.spark.events.EngineEventsStore
+
+// scalastyle:off line.size.limit
+/**
+ * HistoryServer plugin for Kyuubi, It can be used as a plugin in 
SparkHistoryServer to make SparkHistoryServer UI display Kyuubi's Tab.
+ * We can use it like:
+ *  - Copy the kyuubi-spark-sql-engine jar to $SPARK_HOME/jars and restart 
SparkHistoryServer.
+ *  - In addition, we can add kyuubi configurations to spark-defaults.conf 
prefixed with "spark.kyuubi.".
+ */
+// scalastyle:on line.size.limit
+class KyuubiHistoryServerPlugin extends AppHistoryServerPlugin {
+
+  override def createListeners(conf: SparkConf, store: ElementTrackingStore): 
Seq[SparkListener] = {
+    val kyuubiConf = mergedKyuubiConf(conf)
+    Seq(new SparkSQLEngineEventListener(store, kyuubiConf))
+  }
+
+  override def setupUI(ui: SparkUI): Unit = {
+    val kyuubiConf = mergedKyuubiConf(ui.conf)
+    val store = new EngineEventsStore(ui.store.store)
+    if (store.getSessionCount > 0) {
+      EngineTab(
+        None,
+        Some(ui),
+        store,
+        kyuubiConf)
+    }
+  }
+
+  private def mergedKyuubiConf(sparkConf: SparkConf): KyuubiConf = {
+    val kyuubiConf = KyuubiConf()
+    val sparkToKyuubiPrefix = "spark.kyuubi."
+    sparkConf.getAllWithPrefix(sparkToKyuubiPrefix).foreach { case (k, v) =>
+      kyuubiConf.set(s"kyuubi.$k", v)
+    }
+    kyuubiConf
+  }
+
+}
diff --git 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala
 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala
index 881ff0e..306e669 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala
@@ -17,206 +17,73 @@
 
 package org.apache.kyuubi.engine.spark.events
 
-import org.apache.kyuubi.KyuubiFunSuite
-import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.{ENGINE_UI_SESSION_LIMIT, 
ENGINE_UI_STATEMENT_LIMIT}
+import org.apache.spark.kyuubi.SparkContextHelper
 
-class EngineEventsStoreSuite extends KyuubiFunSuite {
+import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
+import org.apache.kyuubi.operation.HiveJDBCTestHelper
 
-  test("ensure that the sessions are stored in order") {
-    val store = new EngineEventsStore(KyuubiConf())
+class EngineEventsStoreSuite extends WithSparkSQLEngine with 
HiveJDBCTestHelper {
 
-    val s1 = SessionEvent("a", "ea", "test1", "1.1.1.1", "1.1.1.2", 1L)
-    val s2 = SessionEvent("c", "ea", "test2", "1.1.1.1", "1.1.1.2", 3L)
-    val s3 = SessionEvent("b", "ea", "test3", "1.1.1.1", "1.1.1.2", 2L)
+  var store: EngineEventsStore = _
 
-    store.saveSession(s1)
-    store.saveSession(s2)
-    store.saveSession(s3)
-
-    assert(store.getSessionList.size == 3)
-    assert(store.getSessionList.head.sessionId == "a")
-    assert(store.getSessionList.last.sessionId == "c")
-  }
-
-  test("test drop sessions when reach the threshold ") {
-    val conf = KyuubiConf()
-    conf.set(ENGINE_UI_SESSION_LIMIT, 3)
-
-    val store = new EngineEventsStore(conf)
-    for (i <- 1 to 5) {
-      val s = SessionEvent(s"b$i", "ea", s"test$i", "1.1.1.1", "1.1.1.2", 2L)
-      store.saveSession(s)
-    }
-
-    assert(store.getSessionList.size == 3)
+  override protected def beforeEach(): Unit = {
+    super.beforeEach()
+    startSparkEngine()
+    val kvStore = SparkContextHelper.getKvStore(spark.sparkContext)
+    store = new EngineEventsStore(kvStore)
   }
 
-  test("test drop sessions when reach the threshold, and try to keep active 
events.") {
-    val conf = KyuubiConf()
-    conf.set(ENGINE_UI_SESSION_LIMIT, 3)
-
-    val store = new EngineEventsStore(conf)
-
-    store.saveSession(SessionEvent("s1", "ea", "test1", "1.1.1.1", "1.1.1.2", 
1L, -1L))
-    store.saveSession(SessionEvent("s2", "ea", "test1", "1.1.1.1", "1.1.1.2", 
2L, -1L))
-    store.saveSession(SessionEvent("s3", "ea", "test1", "1.1.1.1", "1.1.1.2", 
3L, 1L))
-    store.saveSession(SessionEvent("s4", "ea", "test1", "1.1.1.1", "1.1.1.2", 
4L, -1L))
-
-    assert(store.getSessionList.size == 3)
-    assert(store.getSessionList(2).sessionId == "s4")
+  override protected def afterEach(): Unit = {
+    super.afterEach()
+    stopSparkEngine()
   }
 
-  test("test check session after update session") {
-    val store = new EngineEventsStore(KyuubiConf())
-    val s = SessionEvent("abc", "ea", "test3", "1.1.1.1", "1.1.1.2", 2L)
-    store.saveSession(s)
-
-    val finishTimestamp: Long = 456L
-    s.endTime = finishTimestamp
-    store.saveSession(s)
-
-    assert(store.getSession("abc").get.endTime == finishTimestamp)
-  }
-
-  test("ensure that the statements are stored in order") {
-    val store = new EngineEventsStore(KyuubiConf())
-
-    val s1 = SparkOperationEvent(
-      "ea1",
-      "select 1",
-      true,
-      "RUNNING",
-      1L,
-      1L,
-      1L,
-      2L,
-      None,
-      "sid1",
-      "a",
-      None)
-    val s2 = SparkOperationEvent(
-      "ea2",
-      "select 2",
-      true,
-      "RUNNING",
-      2L,
-      2L,
-      2L,
-      4L,
-      None,
-      "sid1",
-      "c",
-      None)
-    val s3 = SparkOperationEvent(
-      "ea3",
-      "select 3",
-      true,
-      "RUNNING",
-      3L,
-      3L,
-      3L,
-      6L,
-      None,
-      "sid1",
-      "b",
-      None)
-
-    store.saveStatement(s1)
-    store.saveStatement(s2)
-    store.saveStatement(s3)
-
-    assert(store.getStatementList.size == 3)
-    assert(store.getStatementList.head.statementId == "ea1")
-    assert(store.getStatementList.last.statementId == "ea3")
+  test("EngineEventsStore session test") {
+    assert(store.getSessionList.isEmpty)
+    assert(store.getSessionCount == 0)
+    withJdbcStatement() { statement =>
+      statement.execute(
+        """
+          |SELECT
+          |  l.id % 100 k,
+          |  sum(l.id) sum,
+          |  count(l.id) cnt,
+          |  avg(l.id) avg,
+          |  min(l.id) min,
+          |  max(l.id) max
+          |from range(0, 100000L, 1, 100) l
+          |  left join range(0, 100000L, 2, 100) r ON l.id = r.id
+          |GROUP BY 1""".stripMargin)
+    }
+    assert(store.getSessionList.size == 1)
+    assert(store.getSessionCount == 1)
   }
 
-  test("test drop statements when reach the threshold ") {
-    val conf = KyuubiConf()
-    conf.set(ENGINE_UI_STATEMENT_LIMIT, 3)
-
-    val store = new EngineEventsStore(conf)
-    for (i <- 1 to 5) {
-      val s = SparkOperationEvent(
-        s"ea1${i}",
-        "select 1",
-        true,
-        "RUNNING",
-        1L,
-        1L,
-        1L,
-        2L,
-        None,
-        "sid1",
-        "a",
-        None)
-      store.saveStatement(s)
+  test("EngineEventsStore statement test") {
+    assert(store.getStatementList.isEmpty)
+    assert(store.getStatementCount == 0)
+    val sql = """
+                |SELECT
+                |  l.id % 100 k,
+                |  sum(l.id) sum,
+                |  count(l.id) cnt,
+                |  avg(l.id) avg,
+                |  min(l.id) min,
+                |  max(l.id) max
+                |from range(0, 100000L, 1, 100) l
+                |  left join range(0, 100000L, 2, 100) r ON l.id = r.id
+                |GROUP BY 1""".stripMargin
+    withJdbcStatement() { statement =>
+      statement.execute(sql)
     }
-
-    assert(store.getStatementList.size == 3)
+    val statementList = store.getStatementList
+    assert(statementList.size == 1)
+    assert(store.getStatementCount == 1)
+    val statementId = statementList(0).statementId
+    assert(store.getStatement(statementId).get.statement === sql)
   }
 
-  test("test drop statements when reach the threshold, and try to keep active 
events.") {
-    val conf = KyuubiConf()
-    conf.set(ENGINE_UI_STATEMENT_LIMIT, 3)
-
-    val store = new EngineEventsStore(conf)
-
-    store.saveStatement(SparkOperationEvent(
-      "s1",
-      "select 1",
-      true,
-      "RUNNING",
-      1L,
-      1L,
-      1L,
-      -1L,
-      None,
-      "sid1",
-      "a",
-      None))
-    store.saveStatement(SparkOperationEvent(
-      "s2",
-      "select 1",
-      true,
-      "RUNNING",
-      2L,
-      2L,
-      2L,
-      -1L,
-      None,
-      "sid1",
-      "a",
-      None))
-    store.saveStatement(SparkOperationEvent(
-      "s3",
-      "select 1",
-      true,
-      "RUNNING",
-      3L,
-      3L,
-      3L,
-      3L,
-      None,
-      "sid1",
-      "a",
-      None))
-    store.saveStatement(SparkOperationEvent(
-      "s4",
-      "select 1",
-      true,
-      "RUNNING",
-      4L,
-      4L,
-      4L,
-      -1L,
-      None,
-      "sid1",
-      "a",
-      None))
-
-    assert(store.getStatementList.size == 3)
-    assert(store.getStatementList(2).statementId == "s4")
-  }
+  override def withKyuubiConf: Map[String, String] = Map.empty
 
+  override protected def jdbcUrl: String = getJdbcUrl
 }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/ui/EngineTabSuite.scala
 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/ui/EngineTabSuite.scala
index ca90d92..3803bd0 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/ui/EngineTabSuite.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/ui/EngineTabSuite.scala
@@ -17,12 +17,16 @@
 
 package org.apache.spark.ui
 
+import scala.collection.JavaConverters._
+
 import org.apache.http.client.methods.HttpGet
 import org.apache.http.impl.client.HttpClients
 import org.apache.http.util.EntityUtils
 import org.apache.spark.SparkContext
+import org.apache.spark.kyuubi.SparkContextHelper
 
 import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
+import org.apache.kyuubi.engine.spark.events.{SessionEvent, 
SparkOperationEvent}
 import org.apache.kyuubi.operation.HiveJDBCTestHelper
 
 class EngineTabSuite extends WithSparkSQLEngine with HiveJDBCTestHelper {
@@ -36,6 +40,16 @@ class EngineTabSuite extends WithSparkSQLEngine with 
HiveJDBCTestHelper {
     super.beforeAll()
   }
 
+  override protected def beforeEach(): Unit = {
+    val kvstore = SparkContextHelper.getKvStore(spark.sparkContext)
+    kvstore.view(classOf[SessionEvent]).closeableIterator().asScala.foreach(j 
=> {
+      kvstore.delete(j.getClass, j.sessionId)
+    })
+    
kvstore.view(classOf[SparkOperationEvent]).closeableIterator().asScala.foreach(j
 => {
+      kvstore.delete(j.getClass, j.statementId)
+    })
+  }
+
   test("basic stats for engine tab") {
     assert(spark.sparkContext.ui.nonEmpty)
     val client = HttpClients.createDefault()

Reply via email to