This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new be6133a [KYUUBI #1816] Implement KyuubiHistoryServerPlugin
be6133a is described below
commit be6133ae99c3c702587e9eaf1ca63fea99aa72f9
Author: Wang Zhen <[email protected]>
AuthorDate: Thu Feb 10 09:38:13 2022 +0800
[KYUUBI #1816] Implement KyuubiHistoryServerPlugin
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
Implement KyuubiHistoryServerPlugin. #1816
### _How was this patch tested?_
- [X] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [X] Add screenshots for manual tests if appropriate
**SparkUI:**


**SparkHistoryServer:**


- [X] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1820 from wForget/KYUUBI-1816.
Closes #1816
a3256920 [Wang Zhen] fix test
256363d8 [Wang Zhen] fix
0aecc305 [Wang Zhen] load kyuubiConf from sparkConf
86608fa5 [wForget] fix tests
2481721b [wForget] fix
d4f5825d [wForget] Merge remote-tracking branch 'origin/master' into
KYUUBI-1816
18c18458 [wForget] fix checkstyle
c21f8b10 [wForget] [KYUUBI-1816] add AppHistoryServerPlugin services
0adf3634 [Wang Zhen] [KYUUBI-1816] Add KyuubiHistoryServerPlugin
8f5b1196 [Wang Zhen] [KYUUBI-1816] Refactor EngineEventsStore to get Events
through ElementTrackingStore.
678396d1 [Wang Zhen] [KYUUBI-1816] Add SparkSQLEngineEventListener
fcc1a925 [Wang Zhen] [KYUUBI-1816] Add the @KVIndex property to the event,
which will be used for KvStore
Lead-authored-by: Wang Zhen <[email protected]>
Co-authored-by: wForget <[email protected]>
Signed-off-by: ulysses-you <[email protected]>
---
.rat-excludes | 1 +
.../org.apache.spark.status.AppHistoryServerPlugin | 1 +
.../kyuubi/engine/spark/KyuubiSparkUtil.scala | 5 +
.../kyuubi/engine/spark/SparkSQLEngine.scala | 21 +-
.../engine/spark/events/EngineEventsStore.scala | 101 ++-------
.../kyuubi/engine/spark/events/SessionEvent.scala | 8 +-
.../engine/spark/events/SparkOperationEvent.scala | 8 +-
.../apache/spark/kyuubi/SparkContextHelper.scala | 10 +
.../spark/kyuubi/SparkSQLEngineEventListener.scala | 98 +++++++++
.../spark/kyuubi/SparkSQLEngineListener.scala | 20 +-
.../scala/org/apache/spark/ui/EnginePage.scala | 42 ++--
.../org/apache/spark/ui/EngineSessionPage.scala | 43 ++--
.../main/scala/org/apache/spark/ui/EngineTab.scala | 31 ++-
.../spark/ui/KyuubiHistoryServerPlugin.scala | 64 ++++++
.../spark/events/EngineEventsStoreSuite.scala | 243 +++++----------------
.../scala/org/apache/spark/ui/EngineTabSuite.scala | 14 ++
16 files changed, 369 insertions(+), 341 deletions(-)
diff --git a/.rat-excludes b/.rat-excludes
index 709e852..5194ffe 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -39,3 +39,4 @@ build/scala-*/**
**/*.output.schema
**/apache-kyuubi-*-bin*/**
**/benchmarks/**
+**/org.apache.spark.status.AppHistoryServerPlugin
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin
b/externals/kyuubi-spark-sql-engine/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin
new file mode 100644
index 0000000..2be4ed7
--- /dev/null
+++
b/externals/kyuubi-spark-sql-engine/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin
@@ -0,0 +1 @@
+org.apache.spark.ui.KyuubiHistoryServerPlugin
\ No newline at end of file
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala
index 51752d0..3a708e0 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/KyuubiSparkUtil.scala
@@ -19,13 +19,18 @@ package org.apache.kyuubi.engine.spark
import java.time.{Instant, LocalDateTime, ZoneId}
+import scala.annotation.meta.getter
+
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.kvstore.KVIndex
import org.apache.kyuubi.Utils
object KyuubiSparkUtil {
+ type KVIndexParam = KVIndex @getter
+
final val SPARK_SCHEDULER_POOL_KEY = "spark.scheduler.pool"
final val SPARK_SQL_EXECUTION_ID_KEY = "spark.sql.execution.id"
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index e2cf3ad..f884fec 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.CountDownLatch
import scala.util.control.NonFatal
import org.apache.spark.{ui, SparkConf}
+import org.apache.spark.kyuubi.{SparkContextHelper,
SparkSQLEngineEventListener, SparkSQLEngineListener}
import org.apache.spark.kyuubi.SparkSQLEngineListener
import org.apache.spark.kyuubi.SparkUtilsHelper.getLocalDir
import org.apache.spark.sql.SparkSession
@@ -38,16 +39,17 @@ import org.apache.kyuubi.ha.client.RetryPolicies
import org.apache.kyuubi.service.Serverable
import org.apache.kyuubi.util.SignalRegister
-case class SparkSQLEngine(
- spark: SparkSession,
- store: EngineEventsStore) extends Serverable("SparkSQLEngine") {
+case class SparkSQLEngine(spark: SparkSession) extends
Serverable("SparkSQLEngine") {
override val backendService = new SparkSQLBackendService(spark)
override val frontendServices = Seq(new SparkTBinaryFrontendService(this))
override def initialize(conf: KyuubiConf): Unit = {
- val listener = new SparkSQLEngineListener(this, store)
+ val listener = new SparkSQLEngineListener(this)
spark.sparkContext.addSparkListener(listener)
+ val kvStore = SparkContextHelper.getKvStore(spark.sparkContext)
+ val engineEventListener = new SparkSQLEngineEventListener(kvStore, conf)
+ spark.sparkContext.addSparkListener(engineEventListener)
super.initialize(conf)
}
@@ -122,8 +124,7 @@ object SparkSQLEngine extends Logging {
}
def startEngine(spark: SparkSession): Unit = {
- val store = new EngineEventsStore(kyuubiConf)
- currentEngine = Some(new SparkSQLEngine(spark, store))
+ currentEngine = Some(new SparkSQLEngine(spark))
currentEngine.foreach { engine =>
// start event logging ahead so that we can capture all statuses
val eventLogging = new EventLoggingService(spark.sparkContext)
@@ -145,7 +146,13 @@ object SparkSQLEngine extends Logging {
}
try {
engine.start()
- ui.EngineTab(engine)
+ val kvStore = SparkContextHelper.getKvStore(spark.sparkContext)
+ val store = new EngineEventsStore(kvStore)
+ ui.EngineTab(
+ Some(engine),
+ SparkContextHelper.getSparkUI(spark.sparkContext),
+ store,
+ kyuubiConf)
val event = EngineEvent(engine)
info(event)
EventLoggingService.onEvent(event)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala
index d05cd8e..d69f0e2 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStore.scala
@@ -17,12 +17,9 @@
package org.apache.kyuubi.engine.spark.events
-import java.util.concurrent.ConcurrentHashMap
+import scala.collection.JavaConverters._
-import scala.collection.JavaConverters.collectionAsScalaIterableConverter
-
-import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.{ENGINE_UI_SESSION_LIMIT,
ENGINE_UI_STATEMENT_LIMIT}
+import org.apache.spark.util.kvstore.KVStore
/**
* A memory store that tracking the number of statements and sessions, it
provides:
@@ -32,106 +29,44 @@ import
org.apache.kyuubi.config.KyuubiConf.{ENGINE_UI_SESSION_LIMIT, ENGINE_UI_S
* 1). remove the finished events first.
* 2). remove the active events if still reach the threshold.
*/
-class EngineEventsStore(conf: KyuubiConf) {
-
- /**
- * The number of SQL client sessions kept in the Kyuubi Query Engine web UI.
- */
- private val retainedSessions: Int = conf.get(ENGINE_UI_SESSION_LIMIT)
-
- /**
- * The number of statements kept in the Kyuubi Query Engine web UI.
- */
- private val retainedStatements: Int = conf.get(ENGINE_UI_STATEMENT_LIMIT)
-
- /**
- * store all session events.
- */
- val sessions = new ConcurrentHashMap[String, SessionEvent]
+class EngineEventsStore(store: KVStore) {
/**
* get all session events order by startTime
*/
def getSessionList: Seq[SessionEvent] = {
- sessions.values().asScala.toSeq.sortBy(_.startTime)
+ store.view(classOf[SessionEvent]).asScala.toSeq
}
def getSession(sessionId: String): Option[SessionEvent] = {
- Option(sessions.get(sessionId))
- }
-
- /**
- * save session events and check the capacity threshold
- */
- def saveSession(sessionEvent: SessionEvent): Unit = {
- sessions.put(sessionEvent.sessionId, sessionEvent)
- checkSessionCapacity()
- }
-
- /**
- * cleanup the session events if reach the threshold
- */
- private def checkSessionCapacity(): Unit = {
- var countToDelete = sessions.size - retainedSessions
-
- val reverseSeq =
sessions.values().asScala.toSeq.sortBy(_.startTime).reverse
-
- // remove finished sessions first.
- for (event <- reverseSeq if event.endTime != -1L && countToDelete > 0) {
- sessions.remove(event.sessionId)
- countToDelete -= 1
- }
-
- // remove active event if still reach the threshold
- for (event <- reverseSeq if countToDelete > 0) {
- sessions.remove(event.sessionId)
- countToDelete -= 1
+ try {
+ Some(store.read(classOf[SessionEvent], sessionId))
+ } catch {
+ case _: NoSuchElementException => None
}
}
/**
- * store all statements events.
- */
- val statements = new ConcurrentHashMap[String, SparkOperationEvent]
-
- /**
* get all statement events order by startTime
*/
def getStatementList: Seq[SparkOperationEvent] = {
- statements.values().asScala.toSeq.sortBy(_.createTime)
+ store.view(classOf[SparkOperationEvent]).asScala.toSeq
}
def getStatement(statementId: String): Option[SparkOperationEvent] = {
- Option(statements.get(statementId))
+ try {
+ Some(store.read(classOf[SparkOperationEvent], statementId))
+ } catch {
+ case _: NoSuchElementException => None
+ }
}
- /**
- * save statement events and check the capacity threshold
- */
- def saveStatement(statementEvent: SparkOperationEvent): Unit = {
- statements.put(statementEvent.statementId, statementEvent)
- checkStatementCapacity()
+ def getSessionCount: Long = {
+ store.count(classOf[SessionEvent])
}
- /**
- * cleanup the statement events if reach the threshold
- */
- private def checkStatementCapacity(): Unit = {
- var countToDelete = statements.size - retainedStatements
-
- val reverseSeq =
statements.values().asScala.toSeq.sortBy(_.createTime).reverse
-
- // remove finished statements first.
- for (event <- reverseSeq if event.completeTime != -1L && countToDelete >
0) {
- statements.remove(event.statementId)
- countToDelete -= 1
- }
-
- // remove active event if still reach the threshold
- for (event <- reverseSeq if countToDelete > 0) {
- statements.remove(event.statementId)
- countToDelete -= 1
- }
+ def getStatementCount: Long = {
+ store.count(classOf[SparkOperationEvent])
}
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala
index ce21c39..670d085 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SessionEvent.scala
@@ -17,11 +17,14 @@
package org.apache.kyuubi.engine.spark.events
+import com.fasterxml.jackson.annotation.JsonIgnore
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.kvstore.KVIndex
import org.apache.kyuubi.Utils
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil
+import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.KVIndexParam
import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
/**
@@ -35,7 +38,7 @@ import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
* @param totalOperations how many queries and meta calls
*/
case class SessionEvent(
- sessionId: String,
+ @KVIndexParam sessionId: String,
engineId: String,
username: String,
ip: String,
@@ -55,6 +58,9 @@ case class SessionEvent(
endTime - startTime
}
}
+
+ @JsonIgnore @KVIndex("endTime")
+ private def endTimeIndex: Long = if (endTime > 0L) endTime else -1L
}
object SessionEvent {
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala
index 87fbc87..d496989 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/events/SparkOperationEvent.scala
@@ -17,10 +17,13 @@
package org.apache.kyuubi.engine.spark.events
+import com.fasterxml.jackson.annotation.JsonIgnore
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.kvstore.KVIndex
import org.apache.kyuubi.Utils
+import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.KVIndexParam
import org.apache.kyuubi.engine.spark.operation.SparkOperation
/**
@@ -45,7 +48,7 @@ import org.apache.kyuubi.engine.spark.operation.SparkOperation
* @param executionId the query execution id of this operation
*/
case class SparkOperationEvent(
- statementId: String,
+ @KVIndexParam statementId: String,
statement: String,
shouldRunAsync: Boolean,
state: String,
@@ -69,6 +72,9 @@ case class SparkOperationEvent(
completeTime - createTime
}
}
+
+ @JsonIgnore @KVIndex("completeTime")
+ private def completeTimeIndex: Long = if (completeTime > 0L) completeTime
else -1L
}
object SparkOperationEvent {
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala
index 87e74db..ba67151 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkContextHelper.scala
@@ -24,6 +24,8 @@ import org.apache.spark.scheduler.SchedulerBackend
import
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.local.LocalSchedulerBackend
+import org.apache.spark.status.ElementTrackingStore
+import org.apache.spark.ui.SparkUI
import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_STATEMENT_ID_KEY
@@ -40,6 +42,14 @@ object SparkContextHelper extends Logging {
new SparkHistoryEventLogger(sc)
}
+ def getKvStore(sc: SparkContext): ElementTrackingStore = {
+ sc.statusStore.store.asInstanceOf[ElementTrackingStore]
+ }
+
+ def getSparkUI(sc: SparkContext): Option[SparkUI] = {
+ sc.ui
+ }
+
def updateDelegationTokens(sc: SparkContext, creds: Credentials): Unit = {
val bytes = SparkHadoopUtil.get.serialize(creds)
sc.schedulerBackend match {
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineEventListener.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineEventListener.scala
new file mode 100644
index 0000000..52e50b5
--- /dev/null
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineEventListener.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.kyuubi
+
+import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
+import org.apache.spark.status.{ElementTrackingStore, KVUtils}
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_UI_SESSION_LIMIT,
ENGINE_UI_STATEMENT_LIMIT}
+import org.apache.kyuubi.engine.spark.events.{SessionEvent,
SparkOperationEvent}
+
+class SparkSQLEngineEventListener(
+ kvstore: ElementTrackingStore,
+ kyuubiConf: KyuubiConf) extends SparkListener {
+
+ /**
+ * The number of SQL client sessions kept in the Kyuubi Query Engine web UI.
+ */
+ private val retainedSessions: Int = kyuubiConf.get(ENGINE_UI_SESSION_LIMIT)
+
+ /**
+ * The number of statements kept in the Kyuubi Query Engine web UI.
+ */
+ private val retainedStatements: Int =
kyuubiConf.get(ENGINE_UI_STATEMENT_LIMIT)
+
+ kvstore.addTrigger(classOf[SessionEvent], retainedSessions) { count =>
+ cleanupSession(count)
+ }
+
+ kvstore.addTrigger(classOf[SparkOperationEvent], retainedStatements) { count
=>
+ cleanupOperation(count)
+ }
+
+ override def onOtherEvent(event: SparkListenerEvent): Unit = {
+ event match {
+ case e: SessionEvent => updateSessionStore(e)
+ case e: SparkOperationEvent => updateStatementStore(e)
+ case _ => // Ignore
+ }
+ }
+
+ private def updateSessionStore(event: SessionEvent): Unit = {
+ kvstore.write(event, event.endTime == -1L)
+ }
+
+ private def updateStatementStore(event: SparkOperationEvent): Unit = {
+ kvstore.write(event, true)
+ }
+
+ private def cleanupSession(count: Long): Unit = {
+ val countToDelete = calculateNumberToRemove(count, retainedSessions)
+ if (countToDelete <= 0L) {
+ return
+ }
+ val view = kvstore.view(classOf[SessionEvent]).index("endTime").first(0L)
+ val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { j =>
+ j.endTime != 0L
+ }
+
+ toDelete.foreach { j => kvstore.delete(j.getClass, j.sessionId) }
+ }
+
+ private def cleanupOperation(count: Long): Unit = {
+ val countToDelete = calculateNumberToRemove(count, retainedStatements)
+ if (countToDelete <= 0L) {
+ return
+ }
+ val view =
kvstore.view(classOf[SparkOperationEvent]).index("completeTime").first(0L)
+ val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { j =>
+ j.completeTime != 0
+ }
+ toDelete.foreach { j => kvstore.delete(j.getClass, j.statementId) }
+ }
+
+ private def calculateNumberToRemove(dataSize: Long, retainedSize: Long):
Long = {
+ if (dataSize > retainedSize) {
+ math.max(retainedSize / 10L, dataSize - retainedSize)
+ } else {
+ 0L
+ }
+ }
+
+}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala
index 77414db..8e32b53 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkSQLEngineListener.scala
@@ -30,7 +30,6 @@ import org.apache.kyuubi.Logging
import org.apache.kyuubi.Utils.stringifyException
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_STATEMENT_ID_KEY
-import org.apache.kyuubi.engine.spark.events.{EngineEventsStore, SessionEvent,
SparkOperationEvent}
import org.apache.kyuubi.service.{Serverable, ServiceState}
/**
@@ -38,9 +37,7 @@ import org.apache.kyuubi.service.{Serverable, ServiceState}
*
* @param server the corresponding engine
*/
-class SparkSQLEngineListener(
- server: Serverable,
- store: EngineEventsStore) extends SparkListener with Logging {
+class SparkSQLEngineListener(server: Serverable) extends SparkListener with
Logging {
// the conf of server is null before initialized, use lazy val here
private lazy val deregisterExceptions: Seq[String] =
@@ -116,19 +113,4 @@ class SparkSQLEngineListener(
case e => e
}
- override def onOtherEvent(event: SparkListenerEvent): Unit = {
- event match {
- case e: SessionEvent => updateSessionStore(e)
- case e: SparkOperationEvent => updateStatementStore(e)
- case _ => // Ignore
- }
- }
-
- private def updateSessionStore(event: SessionEvent): Unit = {
- store.saveSession(event)
- }
-
- private def updateStatementStore(event: SparkOperationEvent): Unit = {
- store.saveStatement(event)
- }
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala
index b18888f..d7b578c 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala
@@ -33,7 +33,7 @@ import org.apache.kyuubi.{KYUUBI_VERSION, Utils}
import org.apache.kyuubi.engine.spark.events.{SessionEvent,
SparkOperationEvent}
case class EnginePage(parent: EngineTab) extends WebUIPage("") {
- private val store = parent.engine.store
+ private val store = parent.store
override def render(request: HttpServletRequest): Seq[Node] = {
val content =
@@ -42,8 +42,8 @@ case class EnginePage(parent: EngineTab) extends
WebUIPage("") {
stop(request) ++
<br/> ++
<h4>
- {parent.engine.backendService.sessionManager.getOpenSessionCount}
session(s) are online,
- running
{parent.engine.backendService.sessionManager.operationManager.getOperationCount}
+ {store.getSessionCount} session(s) are online,
+ running {store.getStatementCount}
operations
</h4> ++
generateSessionStatsTable(request) ++
@@ -52,7 +52,7 @@ case class EnginePage(parent: EngineTab) extends
WebUIPage("") {
}
private def generateBasicStats(): Seq[Node] = {
- val timeSinceStart = System.currentTimeMillis() -
parent.engine.getStartTime
+ val timeSinceStart = parent.endTime() - parent.startTime
<ul class ="list-unstyled">
<li>
<strong>Kyuubi Version: </strong>
@@ -60,24 +60,32 @@ case class EnginePage(parent: EngineTab) extends
WebUIPage("") {
</li>
<li>
<strong>Started at: </strong>
- {new Date(parent.engine.getStartTime)}
- </li>
- <li>
- <strong>Latest Logout at: </strong>
- {new
Date(parent.engine.backendService.sessionManager.latestLogoutTime)}
+ {new Date(parent.startTime)}
</li>
+ {
+ parent.engine.map { engine =>
+ <li>
+ <strong>Latest Logout at: </strong>
+ {new Date(engine.backendService.sessionManager.latestLogoutTime)}
+ </li>
+ }.getOrElse(Seq.empty)
+ }
<li>
<strong>Time since start: </strong>
{formatDurationVerbose(timeSinceStart)}
</li>
- <li>
- <strong>Background execution pool threads alive: </strong>
- {parent.engine.backendService.sessionManager.getExecPoolSize}
- </li>
- <li>
- <strong>Background execution pool threads active: </strong>
- {parent.engine.backendService.sessionManager.getActiveCount}
- </li>
+ {
+ parent.engine.map { engine =>
+ <li>
+ <strong>Background execution pool threads alive: </strong>
+ {engine.backendService.sessionManager.getExecPoolSize}
+ </li>
+ <li>
+ <strong>Background execution pool threads active: </strong>
+ {engine.backendService.sessionManager.getActiveCount}
+ </li>
+ }.getOrElse(Seq.empty)
+ }
</ul>
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala
index 5d21459..18c12e3 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala
@@ -29,7 +29,7 @@ import org.apache.spark.util.Utils
/** Page for Spark Web UI that shows statistics of jobs running in the engine
server */
case class EngineSessionPage(parent: EngineTab)
extends WebUIPage("session") with Logging {
- val store = parent.engine.store
+ val store = parent.store
/** Render the page */
def render(request: HttpServletRequest): Seq[Node] = {
@@ -55,23 +55,30 @@ case class EngineSessionPage(parent: EngineTab)
}
/** Generate basic stats of the engine server */
- private def generateBasicStats(): Seq[Node] = {
- val timeSinceStart = System.currentTimeMillis() -
parent.engine.getStartTime
- <ul class ="list-unstyled">
- <li>
- <strong>Started at: </strong>
- {new Date(parent.engine.getStartTime)}
- </li>
- <li>
- <strong>Latest Logout at: </strong>
- {new
Date(parent.engine.backendService.sessionManager.latestLogoutTime)}
- </li>
- <li>
- <strong>Time since start: </strong>
- {formatDurationVerbose(timeSinceStart)}
- </li>
- </ul>
- }
+ private def generateBasicStats(): Seq[Node] =
+ if (parent.engine.isDefined) {
+ val timeSinceStart = parent.endTime() - parent.startTime
+ <ul class ="list-unstyled">
+ <li>
+ <strong>Started at: </strong>
+ {new Date(parent.startTime)}
+ </li>
+ {
+ parent.engine.map { engine =>
+ <li>
+ <strong>Latest Logout at: </strong>
+ {new Date(engine.backendService.sessionManager.latestLogoutTime)}
+ </li>
+ }.getOrElse(Seq.empty)
+ }
+ <li>
+ <strong>Time since start: </strong>
+ {formatDurationVerbose(timeSinceStart)}
+ </li>
+ </ul>
+ } else {
+ Seq.empty
+ }
/** Generate stats of batch statements of the engine server */
private def generateSQLStatsTable(request: HttpServletRequest, sessionID:
String): Seq[Node] = {
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineTab.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineTab.scala
index 0ebb5d1..e0c6e1a 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineTab.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineTab.scala
@@ -24,25 +24,42 @@ import scala.util.control.NonFatal
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.spark.SparkSQLEngine
+import org.apache.kyuubi.engine.spark.events.EngineEventsStore
import org.apache.kyuubi.service.ServiceState
/**
* Note that [[SparkUITab]] is private for Spark
*/
-case class EngineTab(engine: SparkSQLEngine)
- extends SparkUITab(engine.spark.sparkContext.ui.orNull, "kyuubi") with
Logging {
+case class EngineTab(
+ engine: Option[SparkSQLEngine],
+ sparkUI: Option[SparkUI],
+ store: EngineEventsStore,
+ kyuubiConf: KyuubiConf)
+ extends SparkUITab(sparkUI.orNull, "kyuubi") with Logging {
override val name: String = "Kyuubi Query Engine"
- val killEnabled = engine.getConf.get(KyuubiConf.ENGINE_UI_STOP_ENABLED)
+ val killEnabled = kyuubiConf.get(KyuubiConf.ENGINE_UI_STOP_ENABLED)
- engine.spark.sparkContext.ui.foreach { ui =>
+ val startTime = engine.map(_.getStartTime).getOrElse {
+ sparkUI
+ .map(ui => ui.store.applicationInfo().attempts.head.startTime.getTime)
+ .getOrElse(0L)
+ }
+
+ def endTime(): Long = engine.map(_ => System.currentTimeMillis()).getOrElse {
+ sparkUI
+ .map(ui => ui.store.applicationInfo().attempts.head.endTime.getTime)
+ .getOrElse(0L)
+ }
+
+ sparkUI.foreach { ui =>
this.attachPage(EnginePage(this))
this.attachPage(EngineSessionPage(this))
ui.attachTab(this)
Utils.addShutdownHook(() => ui.detachTab(this))
}
- engine.spark.sparkContext.ui.foreach { ui =>
+ sparkUI.foreach { ui =>
try {
// Spark shade the jetty package so here we use reflect
Class.forName("org.apache.spark.ui.SparkUI")
@@ -68,8 +85,8 @@ case class EngineTab(engine: SparkSQLEngine)
}
def handleKillRequest(request: HttpServletRequest): Unit = {
- if (killEnabled && engine != null && engine.getServiceState !=
ServiceState.STOPPED) {
- engine.stop()
+ if (killEnabled && engine.isDefined && engine.get.getServiceState !=
ServiceState.STOPPED) {
+ engine.get.stop()
}
}
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/KyuubiHistoryServerPlugin.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/KyuubiHistoryServerPlugin.scala
new file mode 100644
index 0000000..f2eb96d
--- /dev/null
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/KyuubiHistoryServerPlugin.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui
+
+import org.apache.spark.SparkConf
+import org.apache.spark.kyuubi.SparkSQLEngineEventListener
+import org.apache.spark.scheduler.SparkListener
+import org.apache.spark.status.{AppHistoryServerPlugin, ElementTrackingStore}
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.spark.events.EngineEventsStore
+
+// scalastyle:off line.size.limit
+/**
+ * HistoryServer plugin for Kyuubi, It can be used as a plugin in
SparkHistoryServer to make SparkHistoryServer UI display Kyuubi's Tab.
+ * We can use it like:
+ * - Copy the kyuubi-spark-sql-engine jar to $SPARK_HOME/jars and restart
SparkHistoryServer.
+ * - In addition, we can add kyuubi configurations to spark-defaults.conf
prefixed with "spark.kyuubi.".
+ */
+// scalastyle:on line.size.limit
+class KyuubiHistoryServerPlugin extends AppHistoryServerPlugin {
+
+ override def createListeners(conf: SparkConf, store: ElementTrackingStore):
Seq[SparkListener] = {
+ val kyuubiConf = mergedKyuubiConf(conf)
+ Seq(new SparkSQLEngineEventListener(store, kyuubiConf))
+ }
+
+ override def setupUI(ui: SparkUI): Unit = {
+ val kyuubiConf = mergedKyuubiConf(ui.conf)
+ val store = new EngineEventsStore(ui.store.store)
+ if (store.getSessionCount > 0) {
+ EngineTab(
+ None,
+ Some(ui),
+ store,
+ kyuubiConf)
+ }
+ }
+
+ private def mergedKyuubiConf(sparkConf: SparkConf): KyuubiConf = {
+ val kyuubiConf = KyuubiConf()
+ val sparkToKyuubiPrefix = "spark.kyuubi."
+ sparkConf.getAllWithPrefix(sparkToKyuubiPrefix).foreach { case (k, v) =>
+ kyuubiConf.set(s"kyuubi.$k", v)
+ }
+ kyuubiConf
+ }
+
+}
diff --git
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala
index 881ff0e..306e669 100644
---
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala
+++
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/events/EngineEventsStoreSuite.scala
@@ -17,206 +17,73 @@
package org.apache.kyuubi.engine.spark.events
-import org.apache.kyuubi.KyuubiFunSuite
-import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.{ENGINE_UI_SESSION_LIMIT,
ENGINE_UI_STATEMENT_LIMIT}
+import org.apache.spark.kyuubi.SparkContextHelper
-class EngineEventsStoreSuite extends KyuubiFunSuite {
+import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
+import org.apache.kyuubi.operation.HiveJDBCTestHelper
- test("ensure that the sessions are stored in order") {
- val store = new EngineEventsStore(KyuubiConf())
+class EngineEventsStoreSuite extends WithSparkSQLEngine with
HiveJDBCTestHelper {
- val s1 = SessionEvent("a", "ea", "test1", "1.1.1.1", "1.1.1.2", 1L)
- val s2 = SessionEvent("c", "ea", "test2", "1.1.1.1", "1.1.1.2", 3L)
- val s3 = SessionEvent("b", "ea", "test3", "1.1.1.1", "1.1.1.2", 2L)
+ var store: EngineEventsStore = _
- store.saveSession(s1)
- store.saveSession(s2)
- store.saveSession(s3)
-
- assert(store.getSessionList.size == 3)
- assert(store.getSessionList.head.sessionId == "a")
- assert(store.getSessionList.last.sessionId == "c")
- }
-
- test("test drop sessions when reach the threshold ") {
- val conf = KyuubiConf()
- conf.set(ENGINE_UI_SESSION_LIMIT, 3)
-
- val store = new EngineEventsStore(conf)
- for (i <- 1 to 5) {
- val s = SessionEvent(s"b$i", "ea", s"test$i", "1.1.1.1", "1.1.1.2", 2L)
- store.saveSession(s)
- }
-
- assert(store.getSessionList.size == 3)
+ override protected def beforeEach(): Unit = {
+ super.beforeEach()
+ startSparkEngine()
+ val kvStore = SparkContextHelper.getKvStore(spark.sparkContext)
+ store = new EngineEventsStore(kvStore)
}
- test("test drop sessions when reach the threshold, and try to keep active
events.") {
- val conf = KyuubiConf()
- conf.set(ENGINE_UI_SESSION_LIMIT, 3)
-
- val store = new EngineEventsStore(conf)
-
- store.saveSession(SessionEvent("s1", "ea", "test1", "1.1.1.1", "1.1.1.2",
1L, -1L))
- store.saveSession(SessionEvent("s2", "ea", "test1", "1.1.1.1", "1.1.1.2",
2L, -1L))
- store.saveSession(SessionEvent("s3", "ea", "test1", "1.1.1.1", "1.1.1.2",
3L, 1L))
- store.saveSession(SessionEvent("s4", "ea", "test1", "1.1.1.1", "1.1.1.2",
4L, -1L))
-
- assert(store.getSessionList.size == 3)
- assert(store.getSessionList(2).sessionId == "s4")
+ override protected def afterEach(): Unit = {
+ super.afterEach()
+ stopSparkEngine()
}
- test("test check session after update session") {
- val store = new EngineEventsStore(KyuubiConf())
- val s = SessionEvent("abc", "ea", "test3", "1.1.1.1", "1.1.1.2", 2L)
- store.saveSession(s)
-
- val finishTimestamp: Long = 456L
- s.endTime = finishTimestamp
- store.saveSession(s)
-
- assert(store.getSession("abc").get.endTime == finishTimestamp)
- }
-
- test("ensure that the statements are stored in order") {
- val store = new EngineEventsStore(KyuubiConf())
-
- val s1 = SparkOperationEvent(
- "ea1",
- "select 1",
- true,
- "RUNNING",
- 1L,
- 1L,
- 1L,
- 2L,
- None,
- "sid1",
- "a",
- None)
- val s2 = SparkOperationEvent(
- "ea2",
- "select 2",
- true,
- "RUNNING",
- 2L,
- 2L,
- 2L,
- 4L,
- None,
- "sid1",
- "c",
- None)
- val s3 = SparkOperationEvent(
- "ea3",
- "select 3",
- true,
- "RUNNING",
- 3L,
- 3L,
- 3L,
- 6L,
- None,
- "sid1",
- "b",
- None)
-
- store.saveStatement(s1)
- store.saveStatement(s2)
- store.saveStatement(s3)
-
- assert(store.getStatementList.size == 3)
- assert(store.getStatementList.head.statementId == "ea1")
- assert(store.getStatementList.last.statementId == "ea3")
+ test("EngineEventsStore session test") {
+ assert(store.getSessionList.isEmpty)
+ assert(store.getSessionCount == 0)
+ withJdbcStatement() { statement =>
+ statement.execute(
+ """
+ |SELECT
+ | l.id % 100 k,
+ | sum(l.id) sum,
+ | count(l.id) cnt,
+ | avg(l.id) avg,
+ | min(l.id) min,
+ | max(l.id) max
+ |from range(0, 100000L, 1, 100) l
+ | left join range(0, 100000L, 2, 100) r ON l.id = r.id
+ |GROUP BY 1""".stripMargin)
+ }
+ assert(store.getSessionList.size == 1)
+ assert(store.getSessionCount == 1)
}
- test("test drop statements when reach the threshold ") {
- val conf = KyuubiConf()
- conf.set(ENGINE_UI_STATEMENT_LIMIT, 3)
-
- val store = new EngineEventsStore(conf)
- for (i <- 1 to 5) {
- val s = SparkOperationEvent(
- s"ea1${i}",
- "select 1",
- true,
- "RUNNING",
- 1L,
- 1L,
- 1L,
- 2L,
- None,
- "sid1",
- "a",
- None)
- store.saveStatement(s)
+ test("EngineEventsStore statement test") {
+ assert(store.getStatementList.isEmpty)
+ assert(store.getStatementCount == 0)
+ val sql = """
+ |SELECT
+ | l.id % 100 k,
+ | sum(l.id) sum,
+ | count(l.id) cnt,
+ | avg(l.id) avg,
+ | min(l.id) min,
+ | max(l.id) max
+ |from range(0, 100000L, 1, 100) l
+ | left join range(0, 100000L, 2, 100) r ON l.id = r.id
+ |GROUP BY 1""".stripMargin
+ withJdbcStatement() { statement =>
+ statement.execute(sql)
}
-
- assert(store.getStatementList.size == 3)
+ val statementList = store.getStatementList
+ assert(statementList.size == 1)
+ assert(store.getStatementCount == 1)
+ val statementId = statementList(0).statementId
+ assert(store.getStatement(statementId).get.statement === sql)
}
- test("test drop statements when reach the threshold, and try to keep active
events.") {
- val conf = KyuubiConf()
- conf.set(ENGINE_UI_STATEMENT_LIMIT, 3)
-
- val store = new EngineEventsStore(conf)
-
- store.saveStatement(SparkOperationEvent(
- "s1",
- "select 1",
- true,
- "RUNNING",
- 1L,
- 1L,
- 1L,
- -1L,
- None,
- "sid1",
- "a",
- None))
- store.saveStatement(SparkOperationEvent(
- "s2",
- "select 1",
- true,
- "RUNNING",
- 2L,
- 2L,
- 2L,
- -1L,
- None,
- "sid1",
- "a",
- None))
- store.saveStatement(SparkOperationEvent(
- "s3",
- "select 1",
- true,
- "RUNNING",
- 3L,
- 3L,
- 3L,
- 3L,
- None,
- "sid1",
- "a",
- None))
- store.saveStatement(SparkOperationEvent(
- "s4",
- "select 1",
- true,
- "RUNNING",
- 4L,
- 4L,
- 4L,
- -1L,
- None,
- "sid1",
- "a",
- None))
-
- assert(store.getStatementList.size == 3)
- assert(store.getStatementList(2).statementId == "s4")
- }
+ override def withKyuubiConf: Map[String, String] = Map.empty
+ override protected def jdbcUrl: String = getJdbcUrl
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/ui/EngineTabSuite.scala
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/ui/EngineTabSuite.scala
index ca90d92..3803bd0 100644
---
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/ui/EngineTabSuite.scala
+++
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/ui/EngineTabSuite.scala
@@ -17,12 +17,16 @@
package org.apache.spark.ui
+import scala.collection.JavaConverters._
+
import org.apache.http.client.methods.HttpGet
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
import org.apache.spark.SparkContext
+import org.apache.spark.kyuubi.SparkContextHelper
import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
+import org.apache.kyuubi.engine.spark.events.{SessionEvent,
SparkOperationEvent}
import org.apache.kyuubi.operation.HiveJDBCTestHelper
class EngineTabSuite extends WithSparkSQLEngine with HiveJDBCTestHelper {
@@ -36,6 +40,16 @@ class EngineTabSuite extends WithSparkSQLEngine with
HiveJDBCTestHelper {
super.beforeAll()
}
+ override protected def beforeEach(): Unit = {
+ val kvstore = SparkContextHelper.getKvStore(spark.sparkContext)
+ kvstore.view(classOf[SessionEvent]).closeableIterator().asScala.foreach(j
=> {
+ kvstore.delete(j.getClass, j.sessionId)
+ })
+
kvstore.view(classOf[SparkOperationEvent]).closeableIterator().asScala.foreach(j
=> {
+ kvstore.delete(j.getClass, j.statementId)
+ })
+ }
+
test("basic stats for engine tab") {
assert(spark.sparkContext.ui.nonEmpty)
val client = HttpClients.createDefault()