This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 22e6432  [KYUUBI #1022] Add basic EngineStatusStore for events
22e6432 is described below

commit 22e6432e4dcf1123af4c6e0f1f76115b4def38de
Author: timothy65535 <[email protected]>
AuthorDate: Sun Sep 26 10:40:26 2021 +0800

    [KYUUBI #1022] Add basic EngineStatusStore for events
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    
    For more detail, please go to 
https://github.com/apache/incubator-kyuubi/issues/981
    
    `EngineStatusStore` helps to push events to listener bus
    
    `EngineStatusStore` is a memory store that tracking the number of 
statements and sessions, it provides:
    - stores all elements, and sorted by startTimestamp.
    - cleanup the last elements when reach a certain threshold.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #1023 from timothy65535/ky-1022.
    
    Closes #1022
    
    b9f355c4 [timothy65535] [KYUUBI #1022] Add basic EngineStatusStore for 
events
    
    Authored-by: timothy65535 <[email protected]>
    Signed-off-by: Kent Yao <[email protected]>
---
 docs/deployment/settings.md                        |  1 +
 .../kyuubi/engine/spark/SparkSQLEngine.scala       |  4 +-
 .../engine/spark/events/EngineEventsStore.scala    | 89 ++++++++++++++++++++++
 .../spark/kyuubi/SparkSQLEngineListener.scala      | 16 +++-
 .../spark/events/EngineEventsStoreSuite.scala      | 82 ++++++++++++++++++++
 .../org/apache/kyuubi/config/KyuubiConf.scala      |  8 ++
 6 files changed, 197 insertions(+), 3 deletions(-)

diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index d37c53c..43050b5 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -187,6 +187,7 @@ kyuubi\.engine\.share<br>\.level|<div style='width: 
65pt;word-wrap: break-word;w
 kyuubi\.engine\.share<br>\.level\.sub\.domain|<div style='width: 
65pt;word-wrap: break-word;white-space: normal'>&lt;undefined&gt;</div>|<div 
style='width: 170pt;word-wrap: break-word;white-space: normal'>(deprecated) - 
Using kyuubi.engine.share.level.subdomain instead</div>|<div style='width: 
30pt'>string</div>|<div style='width: 20pt'>1.2.0</div>
 kyuubi\.engine\.share<br>\.level\.subdomain|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>&lt;undefined&gt;</div>|<div style='width: 
170pt;word-wrap: break-word;white-space: normal'>Allow end-users to create a 
subdomain for the share level of an engine. A subdomain is a case-insensitive 
string values in `^[a-zA-Z_-]{1,14}$` form. For example, for `USER` share 
level, an end-user can share a certain engine within a subdomain, not for all 
of its clients. End-users are f [...]
 kyuubi\.engine\.single<br>\.spark\.session|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>false</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>When set to true, this engine is running in a 
single session mode. All the JDBC/ODBC connections share the temporary views, 
function registries, SQL configuration and the current database.</div>|<div 
style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.3.0</div>
+kyuubi\.engine\.ui<br>\.retainedSessions|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>200</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>The number of SQL client sessions kept in the 
Kyuubi Query Engine web UI.</div>|<div style='width: 30pt'>int</div>|<div 
style='width: 20pt'>1.4.0</div>
 kyuubi\.engine\.ui\.stop<br>\.enabled|<div style='width: 65pt;word-wrap: 
break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap: 
break-word;white-space: normal'>When true, allows Kyuubi engine to be killed 
from the Spark Web UI.</div>|<div style='width: 30pt'>boolean</div>|<div 
style='width: 20pt'>1.3.0</div>
 
 
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index a25b3db..de160d0 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -30,7 +30,7 @@ import org.apache.kyuubi.Utils._
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.engine.spark.SparkSQLEngine.countDownLatch
-import org.apache.kyuubi.engine.spark.events.{EngineEvent, EventLoggingService}
+import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore, 
EventLoggingService}
 import org.apache.kyuubi.ha.HighAvailabilityConf._
 import org.apache.kyuubi.ha.client.RetryPolicies
 import org.apache.kyuubi.service.{Serverable, ServiceState}
@@ -45,7 +45,7 @@ case class SparkSQLEngine(spark: SparkSession) extends 
Serverable("SparkSQLEngin
   override val frontendServices = Seq(new 
SparkThriftBinaryFrontendService(this))
 
   override def initialize(conf: KyuubiConf): Unit = {
-    val listener = new SparkSQLEngineListener(this)
+    val listener = new SparkSQLEngineListener(this, new 
EngineEventsStore(conf))
     spark.sparkContext.addSparkListener(listener)
     addService(eventLogging)
     super.initialize(conf)
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala
new file mode 100644
index 0000000..dd52635
--- /dev/null
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.events
+
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters.collectionAsScalaIterableConverter
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_UI_SESSION_LIMIT
+
+/**
+ * A memory store that tracking the number of statements and sessions, it 
provides:
+ *
+ * - stores all events.
+ * - cleanup the events when reach a certain threshold:
+ * 1). remove the finished events first.
+ * 2). remove the active events if still reach the threshold.
+ *
+ * // TODO KYUUBI #983 this store will be used in the third task.
+ */
+class EngineEventsStore(conf: KyuubiConf) {
+
+  /**
+   * The number of SQL client sessions kept in the Kyuubi Query Engine web UI.
+   */
+  private val retainedSessions: Int = conf.get(ENGINE_UI_SESSION_LIMIT)
+
+  /**
+   * store all session events.
+   */
+  val sessions = new ConcurrentHashMap[String, SessionEvent]
+
+  /**
+   * get all session events order by startTime
+   */
+  def getSessionList: Seq[SessionEvent] = {
+    sessions.values().asScala.toSeq.sortBy(_.startTime)
+  }
+
+  def getSession(sessionId: String): Option[SessionEvent] = {
+    Option(sessions.get(sessionId))
+  }
+
+  /**
+   * save session events and check the capacity threshold
+   */
+  def saveSession(sessionEvent: SessionEvent): Unit = {
+    sessions.put(sessionEvent.sessionId, sessionEvent)
+    checkSessionCapacity()
+  }
+
+  /**
+   * cleanup the session events if reach the threshold
+   */
+  private def checkSessionCapacity(): Unit = {
+    var countToDelete = sessions.size - retainedSessions
+
+    val reverseSeq = 
sessions.values().asScala.toSeq.sortBy(_.startTime).reverse
+
+    // remove finished sessions first.
+    for (event <- reverseSeq if event.endTime != 0L && countToDelete > 0) {
+      sessions.remove(event.sessionId)
+      countToDelete -= 1
+    }
+
+    // remove active event if still reach the threshold
+    for (event <- reverseSeq if countToDelete > 0) {
+      sessions.remove(event.sessionId)
+      countToDelete -= 1
+    }
+  }
+}
+
diff --git 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala
 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala
index e22967b..f29bcdc 100644
--- 
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala
+++ 
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala
@@ -30,6 +30,7 @@ import 
org.apache.kyuubi.KyuubiSparkUtils.KYUUBI_STATEMENT_ID_KEY
 import org.apache.kyuubi.Logging
 import org.apache.kyuubi.Utils.stringifyException
 import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.engine.spark.events.{EngineEventsStore, SessionEvent}
 import org.apache.kyuubi.engine.spark.monitor.KyuubiStatementMonitor
 import org.apache.kyuubi.engine.spark.monitor.entity.KyuubiJobInfo
 import org.apache.kyuubi.service.{Serverable, ServiceState}
@@ -39,7 +40,9 @@ import org.apache.kyuubi.service.{Serverable, ServiceState}
  *
  * @param server the corresponding engine
  */
-class SparkSQLEngineListener(server: Serverable) extends SparkListener with 
Logging {
+class SparkSQLEngineListener(
+    server: Serverable,
+    store: EngineEventsStore) extends SparkListener with Logging {
 
   // the conf of server is null before initialized, use lazy val here
   private lazy val deregisterExceptions: Seq[String] =
@@ -117,4 +120,15 @@ class SparkSQLEngineListener(server: Serverable) extends 
SparkListener with Logg
       if e.getCause != null => findCause(e.getCause)
     case e => e
   }
+
+  override def onOtherEvent(event: SparkListenerEvent): Unit = {
+    event match {
+      case e: SessionEvent => updateSession(e)
+      case _ => // Ignore
+    }
+  }
+
+  private def updateSession(event: SessionEvent): Unit = {
+    store.saveSession(event)
+  }
 }
diff --git 
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala
 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala
new file mode 100644
index 0000000..46da9c7
--- /dev/null
+++ 
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.events
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_UI_SESSION_LIMIT
+
+class EngineEventsStoreSuite extends KyuubiFunSuite {
+
+  test("ensure that the sessions are stored in order") {
+    val store = new EngineEventsStore(KyuubiConf())
+
+    val s1 = SessionEvent("a", "ea", "test1", "1.1.1.1", 1L)
+    val s2 = SessionEvent("c", "ea", "test2", "1.1.1.1", 3L)
+    val s3 = SessionEvent("b", "ea", "test3", "1.1.1.1", 2L)
+
+    store.saveSession(s1)
+    store.saveSession(s2)
+    store.saveSession(s3)
+
+    assert(store.getSessionList.size == 3)
+    assert(store.getSessionList.head.sessionId == "a")
+    assert(store.getSessionList.last.sessionId == "c")
+  }
+
+  test("test drop sessions when reach the threshold ") {
+    val conf = KyuubiConf()
+    conf.set(ENGINE_UI_SESSION_LIMIT, 3)
+
+    val store = new EngineEventsStore(conf)
+    for (i <- 1 to 5) {
+      val s = SessionEvent(s"b$i", "ea", s"test$i", "1.1.1.1", 2L)
+      store.saveSession(s)
+    }
+
+    assert(store.getSessionList.size == 3)
+  }
+
+  test("test drop sessions when reach the threshold, and try to keep active 
events.") {
+    val conf = KyuubiConf()
+    conf.set(ENGINE_UI_SESSION_LIMIT, 3)
+
+    val store = new EngineEventsStore(conf)
+
+    store.saveSession(SessionEvent("s1", "ea", "test1", "1.1.1.1", 1L, 0L))
+    store.saveSession(SessionEvent("s2", "ea", "test1", "1.1.1.1", 2L, 0L))
+    store.saveSession(SessionEvent("s3", "ea", "test1", "1.1.1.1", 3L, 1L))
+    store.saveSession(SessionEvent("s4", "ea", "test1", "1.1.1.1", 4L, 0L))
+
+    assert(store.getSessionList.size == 3)
+    assert(store.getSessionList(2).sessionId == "s4")
+  }
+
+  test("test check session after update session") {
+    val store = new EngineEventsStore(KyuubiConf())
+    val s = SessionEvent("abc", "ea", "test3", "1.1.1.1", 2L)
+    store.saveSession(s)
+
+    val finishTimestamp: Long = 456L
+    s.endTime = finishTimestamp
+    store.saveSession(s)
+
+    assert(store.getSession("abc").get.endTime == finishTimestamp)
+  }
+
+}
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 6df9cac..1d583b0 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -877,6 +877,14 @@ object KyuubiConf {
       .booleanConf
       .createWithDefault(true)
 
+  val ENGINE_UI_SESSION_LIMIT: ConfigEntry[Int] =
+    buildConf("engine.ui.retainedSessions")
+      .doc("The number of SQL client sessions kept in the Kyuubi Query Engine 
web UI.")
+      .version("1.4.0")
+      .intConf
+      .checkValue(_ > 0, "retained sessions must be positive.")
+      .createWithDefault(200)
+
   val SESSION_NAME: OptionalConfigEntry[String] =
     buildConf("session.name")
       .doc("A human readable name of session and we use empty string by 
default. " +

Reply via email to