This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 22e6432 [KYUUBI #1022] Add basic EngineStatusStore for events
22e6432 is described below
commit 22e6432e4dcf1123af4c6e0f1f76115b4def38de
Author: timothy65535 <[email protected]>
AuthorDate: Sun Sep 26 10:40:26 2021 +0800
[KYUUBI #1022] Add basic EngineStatusStore for events
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
For more detail, please go to
https://github.com/apache/incubator-kyuubi/issues/981
`EngineStatusStore` helps to push events to listener bus
`EngineStatusStore` is a memory store that tracking the number of
statements and sessions, it provides:
- stores all elements, and sorted by startTimestamp.
- cleanup the last elements when reach a certain threshold.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1023 from timothy65535/ky-1022.
Closes #1022
b9f355c4 [timothy65535] [KYUUBI #1022] Add basic EngineStatusStore for
events
Authored-by: timothy65535 <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
docs/deployment/settings.md | 1 +
.../kyuubi/engine/spark/SparkSQLEngine.scala | 4 +-
.../engine/spark/events/EngineEventsStore.scala | 89 ++++++++++++++++++++++
.../spark/kyuubi/SparkSQLEngineListener.scala | 16 +++-
.../spark/events/EngineEventsStoreSuite.scala | 82 ++++++++++++++++++++
.../org/apache/kyuubi/config/KyuubiConf.scala | 8 ++
6 files changed, 197 insertions(+), 3 deletions(-)
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index d37c53c..43050b5 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -187,6 +187,7 @@ kyuubi\.engine\.share<br>\.level|<div style='width:
65pt;word-wrap: break-word;w
kyuubi\.engine\.share<br>\.level\.sub\.domain|<div style='width:
65pt;word-wrap: break-word;white-space: normal'><undefined></div>|<div
style='width: 170pt;word-wrap: break-word;white-space: normal'>(deprecated) -
Using kyuubi.engine.share.level.subdomain instead</div>|<div style='width:
30pt'>string</div>|<div style='width: 20pt'>1.2.0</div>
kyuubi\.engine\.share<br>\.level\.subdomain|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'><undefined></div>|<div style='width:
170pt;word-wrap: break-word;white-space: normal'>Allow end-users to create a
subdomain for the share level of an engine. A subdomain is a case-insensitive
string values in `^[a-zA-Z_-]{1,14}$` form. For example, for `USER` share
level, an end-user can share a certain engine within a subdomain, not for all
of its clients. End-users are f [...]
kyuubi\.engine\.single<br>\.spark\.session|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>false</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>When set to true, this engine is running in a
single session mode. All the JDBC/ODBC connections share the temporary views,
function registries, SQL configuration and the current database.</div>|<div
style='width: 30pt'>boolean</div>|<div style='width: 20pt'>1.3.0</div>
+kyuubi\.engine\.ui<br>\.retainedSessions|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>200</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>The number of SQL client sessions kept in the
Kyuubi Query Engine web UI.</div>|<div style='width: 30pt'>int</div>|<div
style='width: 20pt'>1.4.0</div>
kyuubi\.engine\.ui\.stop<br>\.enabled|<div style='width: 65pt;word-wrap:
break-word;white-space: normal'>true</div>|<div style='width: 170pt;word-wrap:
break-word;white-space: normal'>When true, allows Kyuubi engine to be killed
from the Spark Web UI.</div>|<div style='width: 30pt'>boolean</div>|<div
style='width: 20pt'>1.3.0</div>
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index a25b3db..de160d0 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -30,7 +30,7 @@ import org.apache.kyuubi.Utils._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.spark.SparkSQLEngine.countDownLatch
-import org.apache.kyuubi.engine.spark.events.{EngineEvent, EventLoggingService}
+import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore,
EventLoggingService}
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.RetryPolicies
import org.apache.kyuubi.service.{Serverable, ServiceState}
@@ -45,7 +45,7 @@ case class SparkSQLEngine(spark: SparkSession) extends
Serverable("SparkSQLEngin
override val frontendServices = Seq(new
SparkThriftBinaryFrontendService(this))
override def initialize(conf: KyuubiConf): Unit = {
- val listener = new SparkSQLEngineListener(this)
+ val listener = new SparkSQLEngineListener(this, new
EngineEventsStore(conf))
spark.sparkContext.addSparkListener(listener)
addService(eventLogging)
super.initialize(conf)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala
new file mode 100644
index 0000000..dd52635
--- /dev/null
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.events
+
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters.collectionAsScalaIterableConverter
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_UI_SESSION_LIMIT
+
+/**
+ * A memory store that tracking the number of statements and sessions, it
provides:
+ *
+ * - stores all events.
+ * - cleanup the events when reach a certain threshold:
+ * 1). remove the finished events first.
+ * 2). remove the active events if still reach the threshold.
+ *
+ * // TODO KYUUBI #983 this store will be used in the third task.
+ */
+class EngineEventsStore(conf: KyuubiConf) {
+
+ /**
+ * The number of SQL client sessions kept in the Kyuubi Query Engine web UI.
+ */
+ private val retainedSessions: Int = conf.get(ENGINE_UI_SESSION_LIMIT)
+
+ /**
+ * store all session events.
+ */
+ val sessions = new ConcurrentHashMap[String, SessionEvent]
+
+ /**
+ * get all session events order by startTime
+ */
+ def getSessionList: Seq[SessionEvent] = {
+ sessions.values().asScala.toSeq.sortBy(_.startTime)
+ }
+
+ def getSession(sessionId: String): Option[SessionEvent] = {
+ Option(sessions.get(sessionId))
+ }
+
+ /**
+ * save session events and check the capacity threshold
+ */
+ def saveSession(sessionEvent: SessionEvent): Unit = {
+ sessions.put(sessionEvent.sessionId, sessionEvent)
+ checkSessionCapacity()
+ }
+
+ /**
+ * cleanup the session events if reach the threshold
+ */
+ private def checkSessionCapacity(): Unit = {
+ var countToDelete = sessions.size - retainedSessions
+
+ val reverseSeq =
sessions.values().asScala.toSeq.sortBy(_.startTime).reverse
+
+ // remove finished sessions first.
+ for (event <- reverseSeq if event.endTime != 0L && countToDelete > 0) {
+ sessions.remove(event.sessionId)
+ countToDelete -= 1
+ }
+
+ // remove active event if still reach the threshold
+ for (event <- reverseSeq if countToDelete > 0) {
+ sessions.remove(event.sessionId)
+ countToDelete -= 1
+ }
+ }
+}
+
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala
index e22967b..f29bcdc 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala
@@ -30,6 +30,7 @@ import
org.apache.kyuubi.KyuubiSparkUtils.KYUUBI_STATEMENT_ID_KEY
import org.apache.kyuubi.Logging
import org.apache.kyuubi.Utils.stringifyException
import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.engine.spark.events.{EngineEventsStore, SessionEvent}
import org.apache.kyuubi.engine.spark.monitor.KyuubiStatementMonitor
import org.apache.kyuubi.engine.spark.monitor.entity.KyuubiJobInfo
import org.apache.kyuubi.service.{Serverable, ServiceState}
@@ -39,7 +40,9 @@ import org.apache.kyuubi.service.{Serverable, ServiceState}
*
* @param server the corresponding engine
*/
-class SparkSQLEngineListener(server: Serverable) extends SparkListener with
Logging {
+class SparkSQLEngineListener(
+ server: Serverable,
+ store: EngineEventsStore) extends SparkListener with Logging {
// the conf of server is null before initialized, use lazy val here
private lazy val deregisterExceptions: Seq[String] =
@@ -117,4 +120,15 @@ class SparkSQLEngineListener(server: Serverable) extends
SparkListener with Logg
if e.getCause != null => findCause(e.getCause)
case e => e
}
+
+ override def onOtherEvent(event: SparkListenerEvent): Unit = {
+ event match {
+ case e: SessionEvent => updateSession(e)
+ case _ => // Ignore
+ }
+ }
+
+ private def updateSession(event: SessionEvent): Unit = {
+ store.saveSession(event)
+ }
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala
new file mode 100644
index 0000000..46da9c7
--- /dev/null
+++
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.spark.events
+
+import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.ENGINE_UI_SESSION_LIMIT
+
+class EngineEventsStoreSuite extends KyuubiFunSuite {
+
+ test("ensure that the sessions are stored in order") {
+ val store = new EngineEventsStore(KyuubiConf())
+
+ val s1 = SessionEvent("a", "ea", "test1", "1.1.1.1", 1L)
+ val s2 = SessionEvent("c", "ea", "test2", "1.1.1.1", 3L)
+ val s3 = SessionEvent("b", "ea", "test3", "1.1.1.1", 2L)
+
+ store.saveSession(s1)
+ store.saveSession(s2)
+ store.saveSession(s3)
+
+ assert(store.getSessionList.size == 3)
+ assert(store.getSessionList.head.sessionId == "a")
+ assert(store.getSessionList.last.sessionId == "c")
+ }
+
+ test("test drop sessions when reach the threshold ") {
+ val conf = KyuubiConf()
+ conf.set(ENGINE_UI_SESSION_LIMIT, 3)
+
+ val store = new EngineEventsStore(conf)
+ for (i <- 1 to 5) {
+ val s = SessionEvent(s"b$i", "ea", s"test$i", "1.1.1.1", 2L)
+ store.saveSession(s)
+ }
+
+ assert(store.getSessionList.size == 3)
+ }
+
+ test("test drop sessions when reach the threshold, and try to keep active
events.") {
+ val conf = KyuubiConf()
+ conf.set(ENGINE_UI_SESSION_LIMIT, 3)
+
+ val store = new EngineEventsStore(conf)
+
+ store.saveSession(SessionEvent("s1", "ea", "test1", "1.1.1.1", 1L, 0L))
+ store.saveSession(SessionEvent("s2", "ea", "test1", "1.1.1.1", 2L, 0L))
+ store.saveSession(SessionEvent("s3", "ea", "test1", "1.1.1.1", 3L, 1L))
+ store.saveSession(SessionEvent("s4", "ea", "test1", "1.1.1.1", 4L, 0L))
+
+ assert(store.getSessionList.size == 3)
+ assert(store.getSessionList(2).sessionId == "s4")
+ }
+
+ test("test check session after update session") {
+ val store = new EngineEventsStore(KyuubiConf())
+ val s = SessionEvent("abc", "ea", "test3", "1.1.1.1", 2L)
+ store.saveSession(s)
+
+ val finishTimestamp: Long = 456L
+ s.endTime = finishTimestamp
+ store.saveSession(s)
+
+ assert(store.getSession("abc").get.endTime == finishTimestamp)
+ }
+
+}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 6df9cac..1d583b0 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -877,6 +877,14 @@ object KyuubiConf {
.booleanConf
.createWithDefault(true)
+ val ENGINE_UI_SESSION_LIMIT: ConfigEntry[Int] =
+ buildConf("engine.ui.retainedSessions")
+ .doc("The number of SQL client sessions kept in the Kyuubi Query Engine
web UI.")
+ .version("1.4.0")
+ .intConf
+ .checkValue(_ > 0, "retained sessions must be positive.")
+ .createWithDefault(200)
+
val SESSION_NAME: OptionalConfigEntry[String] =
buildConf("session.name")
.doc("A human readable name of session and we use empty string by
default. " +