This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new a4345ee46 [KYUUBI #5894] Separate closed and online
sessions/statements in the SparkUI's engine tab
a4345ee46 is described below
commit a4345ee462fd973df447a8f37d8c1f5d23772811
Author: wangjunbo <[email protected]>
AuthorDate: Tue Jan 16 20:27:45 2024 +0800
[KYUUBI #5894] Separate closed and online sessions/statements in the
SparkUI's engine tab
Separate closed and online sessions[statements] in the SparkUI's engine tab
# :mag: Description
## Issue References ๐
This pull request fixes #5894
## Describe Your Solution ๐ง
Separate closed and online sessions[statements] in the SparkUI's engine tab

## Types of changes :bookmark:
- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
unit test will add soon.
---
# Checklist ๐
- [ ] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #5946 from Kwafoor/kyuubi_5894.
Closes #5894
ade3fc1cd [wangjunbo] fix
d71a61a96 [wangjunbo] rename span id
0b600f54a [wangjunbo] rename span id
f56ca7e89 [wangjunbo] fix code
79b8e33a5 [wangjunbo] delete test code
cd3ecace7 [wangjunbo] fix CI test, fix html document id duplicate
3a0e3708f [wangjunbo] [KYUUBI #5894] fix scalastyle check
83c7b1552 [wangjunbo] [KYUUBI #5894] Separate closed and online
sessions[statements] in the SparkUI's engine tab
Authored-by: wangjunbo <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../scala/org/apache/spark/ui/EnginePage.scala | 294 +++++++++++++++------
.../org/apache/spark/ui/EngineSessionPage.scala | 151 ++++++++---
.../scala/org/apache/spark/ui/EngineTabSuite.scala | 21 +-
3 files changed, 337 insertions(+), 129 deletions(-)
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala
index 7188ac62f..cae0c03bf 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EnginePage.scala
@@ -23,6 +23,7 @@ import java.util.Date
import javax.servlet.http.HttpServletRequest
import scala.collection.JavaConverters.mapAsScalaMapConverter
+import scala.collection.mutable
import scala.xml.{Node, Unparsed}
import org.apache.commons.text.StringEscapeUtils
@@ -36,18 +37,46 @@ case class EnginePage(parent: EngineTab) extends
WebUIPage("") {
private val store = parent.store
override def render(request: HttpServletRequest): Seq[Node] = {
+ val onlineSession = new mutable.ArrayBuffer[SessionEvent]()
+ val closedSession = new mutable.ArrayBuffer[SessionEvent]()
+
+ val runningSqlStat = new mutable.ArrayBuffer[SparkOperationEvent]()
+ val completedSqlStat = new mutable.ArrayBuffer[SparkOperationEvent]()
+ val failedSqlStat = new mutable.ArrayBuffer[SparkOperationEvent]()
+
+ store.getSessionList.foreach { s =>
+ if (s.endTime <= 0L) {
+ onlineSession += s
+ } else {
+ closedSession += s
+ }
+ }
+
+ store.getStatementList.foreach { op =>
+ if (op.completeTime <= 0L) {
+ runningSqlStat += op
+ } else if (op.exception.isDefined) {
+ failedSqlStat += op
+ } else {
+ completedSqlStat += op
+ }
+ }
+
val content =
generateBasicStats() ++
<br/> ++
stop(request) ++
<br/> ++
<h4>
- {store.getSessionCount} session(s) are online,
- running {store.getStatementCount}
- operations
+ {onlineSession.size} session(s) are online,
+ running {runningSqlStat.size} operation(s)
</h4> ++
- generateSessionStatsTable(request) ++
- generateStatementStatsTable(request)
+ generateSessionStatsTable(request, onlineSession.toSeq,
closedSession.toSeq) ++
+ generateStatementStatsTable(
+ request,
+ runningSqlStat.toSeq,
+ completedSqlStat.toSeq,
+ failedSqlStat.toSeq)
UIUtils.headerSparkPage(request, parent.name, content, parent)
}
@@ -129,102 +158,199 @@ case class EnginePage(parent: EngineTab) extends
WebUIPage("") {
}
}
- /** Generate stats of statements for the engine */
- private def generateStatementStatsTable(request: HttpServletRequest):
Seq[Node] = {
-
- val numStatement = store.getStatementList.size
-
- val table =
- if (numStatement > 0) {
+ /** Generate stats of running statements for the engine */
+ private def generateStatementStatsTable(
+ request: HttpServletRequest,
+ running: Seq[SparkOperationEvent],
+ completed: Seq[SparkOperationEvent],
+ failed: Seq[SparkOperationEvent]): Seq[Node] = {
+
+ val content = mutable.ListBuffer[Node]()
+ if (running.nonEmpty) {
+ val sqlTableTag = "running-sqlstat"
+ val table =
+ statementStatsTable(request, sqlTableTag, parent, running)
+ content ++=
+ <span id="running-sqlstat" class="collapse-aggregated-runningSqlstat
collapse-table"
+ onClick="collapseTable('collapse-aggregated-runningSqlstat',
+ 'aggregated-runningSqlstat')">
+ <h4>
+ <span class="collapse-table-arrow arrow-open"></span>
+ <a>Running Statement Statistics (
+ {running.size}
+ )</a>
+ </h4>
+ </span> ++
+ <div class="aggregated-runningSqlstat collapsible-table">
+ {table}
+ </div>
+ }
- val sqlTableTag = "sqlstat"
+ if (completed.nonEmpty) {
+ val table = {
+ val sqlTableTag = "completed-sqlstat"
+ statementStatsTable(
+ request,
+ sqlTableTag,
+ parent,
+ completed)
+ }
- val sqlTablePage =
-
Option(request.getParameter(s"$sqlTableTag.page")).map(_.toInt).getOrElse(1)
+ content ++=
+ <span id="completed-sqlstat"
class="collapse-aggregated-completedSqlstat collapse-table"
+ onClick="collapseTable('collapse-aggregated-completedSqlstat',
+ 'aggregated-completedSqlstat')">
+ <h4>
+ <span class="collapse-table-arrow arrow-open"></span>
+ <a>Completed Statement Statistics (
+ {completed.size}
+ )</a>
+ </h4>
+ </span> ++
+ <div class="aggregated-completedSqlstat collapsible-table">
+ {table}
+ </div>
+ }
- try {
- Some(new StatementStatsPagedTable(
- request,
- parent,
- store.getStatementList,
- "kyuubi",
- UIUtils.prependBaseUri(request, parent.basePath),
- sqlTableTag).table(sqlTablePage))
- } catch {
- case e @ (_: IllegalArgumentException | _:
IndexOutOfBoundsException) =>
- Some(<div class="alert alert-error">
- <p>Error while rendering job table:</p>
- <pre>
- {Utils.stringifyException(e)}
- </pre>
- </div>)
- }
- } else {
- None
+ if (failed.nonEmpty) {
+ val table = {
+ val sqlTableTag = "failed-sqlstat"
+ statementStatsTable(
+ request,
+ sqlTableTag,
+ parent,
+ failed)
}
- val content =
- <span id="sqlstat" class="collapse-aggregated-sqlstat collapse-table"
- onClick="collapseTable('collapse-aggregated-sqlstat',
- 'aggregated-sqlstat')">
- <h4>
- <span class="collapse-table-arrow arrow-open"></span>
- <a>Statement Statistics ({numStatement})</a>
- </h4>
- </span> ++
- <div class="aggregated-sqlstat collapsible-table">
- {table.getOrElse("No statistics have been generated yet.")}
+
+ content ++=
+ <span id="failed-sqlstat" class="collapse-aggregated-failedSqlstat
collapse-table"
+ onClick="collapseTable('collapse-aggregated-failedSqlstat',
+ 'aggregated-failedSqlstat')">
+ <h4>
+ <span class="collapse-table-arrow arrow-open"></span>
+ <a>Failed Statement Statistics (
+ {failed.size}
+ )</a>
+ </h4>
+ </span> ++
+ <div class="aggregated-failedSqlstat collapsible-table">
+ {table}
</div>
+ }
content
}
- /** Generate stats of sessions for the engine */
- private def generateSessionStatsTable(request: HttpServletRequest):
Seq[Node] = {
- val numSessions = store.getSessionList.size
- val table =
- if (numSessions > 0) {
+ private def statementStatsTable(
+ request: HttpServletRequest,
+ sqlTableTag: String,
+ parent: EngineTab,
+ data: Seq[SparkOperationEvent]): Seq[Node] = {
+
+ val sqlTablePage =
+
Option(request.getParameter(s"$sqlTableTag.page")).map(_.toInt).getOrElse(1)
- val sessionTableTag = "sessionstat"
+ try {
+ new StatementStatsPagedTable(
+ request,
+ parent,
+ data,
+ "kyuubi",
+ UIUtils.prependBaseUri(request, parent.basePath),
+ s"${sqlTableTag}-table").table(sqlTablePage)
+ } catch {
+ case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) =>
+ <div class="alert alert-error">
+ <p>Error while rendering job table:</p>
+ <pre>
+ {Utils.stringifyException(e)}
+ </pre>
+ </div>
+ }
+ }
- val sessionTablePage =
-
Option(request.getParameter(s"$sessionTableTag.page")).map(_.toInt).getOrElse(1)
+ /** Generate stats of online sessions for the engine */
+ private def generateSessionStatsTable(
+ request: HttpServletRequest,
+ online: Seq[SessionEvent],
+ closed: Seq[SessionEvent]): Seq[Node] = {
+ val content = mutable.ListBuffer[Node]()
+ if (online.nonEmpty) {
+ val sessionTableTag = "online-sessionstat"
+ val table = sessionTable(
+ request,
+ sessionTableTag,
+ parent,
+ online)
+ content ++=
+ <span id="online-sessionstat"
class="collapse-aggregated-onlineSessionstat collapse-table"
+ onClick="collapseTable('collapse-aggregated-onlineSessionstat',
+ 'aggregated-onlineSessionstat')">
+ <h4>
+ <span class="collapse-table-arrow arrow-open"></span>
+ <a>Online Session Statistics (
+ {online.size}
+ )</a>
+ </h4>
+ </span> ++
+ <div class="aggregated-onlineSessionstat collapsible-table">
+ {table}
+ </div>
+ }
- try {
- Some(new SessionStatsPagedTable(
- request,
- parent,
- store.getSessionList,
- "kyuubi",
- UIUtils.prependBaseUri(request, parent.basePath),
- sessionTableTag).table(sessionTablePage))
- } catch {
- case e @ (_: IllegalArgumentException | _:
IndexOutOfBoundsException) =>
- Some(<div class="alert alert-error">
- <p>Error while rendering job table:</p>
- <pre>
- {Utils.stringifyException(e)}
- </pre>
- </div>)
- }
- } else {
- None
+ if (closed.nonEmpty) {
+ val table = {
+ val sessionTableTag = "closed-sessionstat"
+ sessionTable(
+ request,
+ sessionTableTag,
+ parent,
+ closed)
}
- val content =
- <span id="sessionstat" class="collapse-aggregated-sessionstat
collapse-table"
- onClick="collapseTable('collapse-aggregated-sessionstat',
- 'aggregated-sessionstat')">
- <h4>
- <span class="collapse-table-arrow arrow-open"></span>
- <a>Session Statistics ({numSessions})</a>
- </h4>
- </span> ++
- <div class="aggregated-sessionstat collapsible-table">
- {table.getOrElse("No statistics have been generated yet.")}
+ content ++=
+ <span id="closed-sessionstat"
class="collapse-aggregated-closedSessionstat collapse-table"
+ onClick="collapseTable('collapse-aggregated-closedSessionstat',
+ 'aggregated-closedSessionstat')">
+ <h4>
+ <span class="collapse-table-arrow arrow-open"></span>
+ <a>Closed Session Statistics (
+ {closed.size}
+ )</a>
+ </h4>
+ </span> ++
+ <div class="aggregated-closedSessionstat collapsible-table">
+ {table}
</div>
-
+ }
content
}
+ private def sessionTable(
+ request: HttpServletRequest,
+ sessionTage: String,
+ parent: EngineTab,
+ data: Seq[SessionEvent]): Seq[Node] = {
+ val sessionPage =
+
Option(request.getParameter(s"$sessionTage.page")).map(_.toInt).getOrElse(1)
+ try {
+ new SessionStatsPagedTable(
+ request,
+ parent,
+ data,
+ "kyuubi",
+ UIUtils.prependBaseUri(request, parent.basePath),
+ s"${sessionTage}-table").table(sessionPage)
+ } catch {
+ case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) =>
+ <div class="alert alert-error">
+ <p>Error while rendering job table:</p>
+ <pre>
+ {Utils.stringifyException(e)}
+ </pre>
+ </div>
+ }
+ }
+
private class SessionStatsPagedTable(
request: HttpServletRequest,
parent: EngineTab,
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala
index cdfc6d313..46011ceae 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/ui/EngineSessionPage.scala
@@ -20,6 +20,7 @@ package org.apache.spark.ui
import java.util.Date
import javax.servlet.http.HttpServletRequest
+import scala.collection.mutable
import scala.xml.Node
import org.apache.spark.internal.Logging
@@ -27,6 +28,8 @@ import
org.apache.spark.internal.config.SECRET_REDACTION_PATTERN
import org.apache.spark.ui.UIUtils._
import org.apache.spark.util.Utils
+import org.apache.kyuubi.engine.spark.events.SparkOperationEvent
+
/** Page for Spark Web UI that shows statistics of jobs running in the engine
server */
case class EngineSessionPage(parent: EngineTab)
extends WebUIPage("session") with Logging {
@@ -126,50 +129,118 @@ case class EngineSessionPage(parent: EngineTab)
/** Generate stats of batch statements of the engine server */
private def generateSQLStatsTable(request: HttpServletRequest, sessionID:
String): Seq[Node] = {
- val executionList = store.getStatementList
+ val running = new mutable.ArrayBuffer[SparkOperationEvent]()
+ val completed = new mutable.ArrayBuffer[SparkOperationEvent]()
+ val failed = new mutable.ArrayBuffer[SparkOperationEvent]()
+
+ store.getStatementList
.filter(_.sessionId == sessionID)
- val numStatement = executionList.size
- val table =
- if (numStatement > 0) {
-
- val sqlTableTag = "sqlsessionstat"
-
- val sqlTablePage =
-
Option(request.getParameter(s"$sqlTableTag.page")).map(_.toInt).getOrElse(1)
-
- try {
- Some(new StatementStatsPagedTable(
- request,
- parent,
- executionList,
- "kyuubi/session",
- UIUtils.prependBaseUri(request, parent.basePath),
- sqlTableTag).table(sqlTablePage))
- } catch {
- case e @ (_: IllegalArgumentException | _:
IndexOutOfBoundsException) =>
- Some(<div class="alert alert-error">
- <p>Error while rendering job table:</p>
- <pre>
- {Utils.exceptionString(e)}
- </pre>
- </div>)
+ .foreach { op =>
+ if (op.completeTime <= 0L) {
+ running += op
+ } else if (op.exception.isDefined) {
+ failed += op
+ } else {
+ completed += op
}
- } else {
- None
}
- val content =
- <span id="sqlsessionstat" class="collapse-aggregated-sqlsessionstat
collapse-table"
- onClick="collapseTable('collapse-aggregated-sqlsessionstat',
- 'aggregated-sqlsessionstat')">
- <h4>
- <span class="collapse-table-arrow arrow-open"></span>
- <a>Statement Statistics</a>
- </h4>
- </span> ++
- <div class="aggregated-sqlsessionstat collapsible-table">
- {table.getOrElse("No statistics have been generated yet.")}
- </div>
+ val content = mutable.ListBuffer[Node]()
+ if (running.nonEmpty) {
+ val sqlTableTag = "running-sqlstat"
+ val table = statementStatsTable(request, sqlTableTag, parent,
running.toSeq)
+ content ++=
+ <span id="running-sqlstat" class="collapse-aggregated-runningSqlstat
collapse-table"
+ onClick="collapseTable('collapse-aggregated-runningSqlstat',
+ 'aggregated-runningSqlstat')">
+ <h4>
+ <span class="collapse-table-arrow arrow-open"></span>
+ <a>Running Statement Statistics</a>
+ </h4>
+ </span> ++
+ <div class="aggregated-runningSqlstat collapsible-table">
+ {table}
+ </div>
+ }
+
+ if (completed.nonEmpty) {
+ val table = {
+ val sqlTableTag = "completed-sqlstat"
+ statementStatsTable(
+ request,
+ sqlTableTag,
+ parent,
+ completed.toSeq)
+ }
+
+ content ++=
+ <span id="completed-sqlstat"
class="collapse-aggregated-completedSqlstat collapse-table"
+ onClick="collapseTable('collapse-aggregated-completedSqlstat',
+ 'aggregated-completedSqlstat')">
+ <h4>
+ <span class="collapse-table-arrow arrow-open"></span>
+ <a>Completed Statement Statistics (
+ {completed.size}
+ )</a>
+ </h4>
+ </span> ++
+ <div class="aggregated-completedSqlstat collapsible-table">
+ {table}
+ </div>
+ }
+
+ if (failed.nonEmpty) {
+ val table = {
+ val sqlTableTag = "failed-sqlstat"
+ statementStatsTable(
+ request,
+ sqlTableTag,
+ parent,
+ failed.toSeq)
+ }
+
+ content ++=
+ <span id="failed-sqlstat" class="collapse-aggregated-failedSqlstat
collapse-table"
+ onClick="collapseTable('collapse-aggregated-failedSqlstat',
+ 'aggregated-failedSqlstat')">
+ <h4>
+ <span class="collapse-table-arrow arrow-open"></span>
+ <a>Failed Statement Statistics (
+ {failed.size}
+ )</a>
+ </h4>
+ </span> ++
+ <div class="aggregated-failedSqlstat collapsible-table">
+ {table}
+ </div>
+ }
content
}
+
+ private def statementStatsTable(
+ request: HttpServletRequest,
+ sqlTableTag: String,
+ parent: EngineTab,
+ data: Seq[SparkOperationEvent]): Seq[Node] = {
+ val sqlTablePage =
+
Option(request.getParameter(s"$sqlTableTag.page")).map(_.toInt).getOrElse(1)
+
+ try {
+ new StatementStatsPagedTable(
+ request,
+ parent,
+ data,
+ "kyuubi/session",
+ UIUtils.prependBaseUri(request, parent.basePath),
+ s"${sqlTableTag}").table(sqlTablePage)
+ } catch {
+ case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) =>
+ <div class="alert alert-error">
+ <p>Error while rendering job table:</p>
+ <pre>
+ {Utils.exceptionString(e)}
+ </pre>
+ </div>
+ }
+ }
}
diff --git
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/ui/EngineTabSuite.scala
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/ui/EngineTabSuite.scala
index 260dbf87e..ad056a064 100644
---
a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/ui/EngineTabSuite.scala
+++
b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/spark/ui/EngineTabSuite.scala
@@ -96,10 +96,10 @@ class EngineTabSuite extends WithSparkSQLEngine with
HiveJDBCTestHelper {
val resp = EntityUtils.toString(response.getEntity)
// check session section
- assert(resp.contains("Session Statistics"))
+ assert(resp.contains("Online Session Statistics"))
// check session stats table id
- assert(resp.contains("sessionstat"))
+ assert(resp.contains("onlineSessionstat"))
// check session stats table title
assert(resp.contains("Total Statements"))
@@ -110,10 +110,11 @@ class EngineTabSuite extends WithSparkSQLEngine with
HiveJDBCTestHelper {
assert(spark.sparkContext.ui.nonEmpty)
val client = HttpClients.createDefault()
val req = new HttpGet(spark.sparkContext.uiWebUrl.get + "/kyuubi/")
- val response = client.execute(req)
+ var response = client.execute(req)
assert(response.getStatusLine.getStatusCode === 200)
- val resp = EntityUtils.toString(response.getEntity)
+ var resp = EntityUtils.toString(response.getEntity)
assert(resp.contains("0 session(s) are online,"))
+ assert(!resp.contains("Statement Statistics"))
withJdbcStatement() { statement =>
statement.execute(
"""
@@ -133,13 +134,23 @@ class EngineTabSuite extends WithSparkSQLEngine with
HiveJDBCTestHelper {
// check session section
assert(resp.contains("Statement Statistics"))
+ assert(!resp.contains("Failed Statement Statistics"))
// check sql stats table id
- assert(resp.contains("sqlstat"))
+ assert(resp.contains("runningSqlstat") ||
resp.contains("completedSqlstat"))
+
+ assert(resp.contains("1 session(s) are online,"))
// check sql stats table title
assert(resp.contains("Query Details"))
}
+ response = client.execute(req)
+ assert(response.getStatusLine.getStatusCode === 200)
+ resp = EntityUtils.toString(response.getEntity)
+ assert(resp.contains("0 session(s) are online,"))
+ assert(resp.contains("running 0 operation(s)"))
+ assert(resp.contains("completedSqlstat"))
+ assert(resp.contains("Completed Statement Statistics"))
}
test("statement redact for engine tab") {