This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c124037b975 [SPARK-41752][SQL][UI] Group nested executions under the
root execution
c124037b975 is described below
commit c124037b97538b2656d29ce547b2a42209a41703
Author: Linhong Liu <[email protected]>
AuthorDate: Tue Jan 10 20:52:15 2023 +0800
[SPARK-41752][SQL][UI] Group nested executions under the root execution
### What changes were proposed in this pull request?
This PR proposes to group all sub-executions together in SQL UI if they
belong to the same root execution.
This feature is controlled by conf `spark.ui.sql.groupSubExecutionEnabled`
and the default value is set to `true`
We can have some follow-up improvements after this PR:
1. Add links to SQL page and Job page to indicate the root execution ID.
2. Better handling for the root execution missing case (e.g. eviction due
to retaining limit). In this PR, the sub-executions will be displayed ungrouped.
### Why are the changes needed?
better user experience.
In PR #39220, the CTAS query will trigger a sub-execution to perform the
data insertion. But the current UI will display the two executions separately
which may confuse the users.
In addition, this change should also help the structured streaming cases
### Does this PR introduce _any_ user-facing change?
Yes, the screenshot of the UI change is shown below
SQL Query:
```
CREATE TABLE t USING PARQUET AS SELECT 'a' as a, 1 as b
```
UI before this PR
<img width="1074" alt="Screen Shot 2022-12-28 at 4 42 08 PM"
src="https://user-images.githubusercontent.com/67896261/209889679-83909bc9-0e15-4ff1-9aeb-3118e4bab524.png">
UI after this PR with sub executions collapsed
<img width="1072" alt="Screen Shot 2022-12-28 at 4 44 32 PM"
src="https://user-images.githubusercontent.com/67896261/209889688-973a4ec9-a5dc-4a8b-8618-c0800733fffa.png">
UI after this PR with sub execution expanded
<img width="1069" alt="Screen Shot 2022-12-28 at 4 44 41 PM"
src="https://user-images.githubusercontent.com/67896261/209889718-0e24be12-23d6-4f81-a508-15eac62ec231.png">
### How was this patch tested?
UT
Closes #39268 from linhongliu-db/SPARK-41752.
Authored-by: Linhong Liu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../apache/spark/status/protobuf/store_types.proto | 1 +
.../resources/org/apache/spark/ui/static/webui.css | 12 +
.../org/apache/spark/internal/config/UI.scala | 7 +
.../apache/spark/sql/execution/SQLExecution.scala | 14 ++
.../spark/sql/execution/ui/AllExecutionsPage.scala | 275 +++++++++++++++------
.../sql/execution/ui/SQLAppStatusListener.scala | 6 +-
.../spark/sql/execution/ui/SQLAppStatusStore.scala | 1 +
.../spark/sql/execution/ui/SQLListener.scala | 2 +
.../org/apache/spark/sql/execution/ui/SQLTab.scala | 3 +
.../sql/SQLExecutionUIDataSerializer.scala | 1 +
.../spark/sql/execution/SQLJsonProtocolSuite.scala | 2 +-
.../history/SQLEventFilterBuilderSuite.scala | 2 +-
.../history/SQLLiveEntitiesEventFilterSuite.scala | 4 +-
.../sql/execution/ui/AllExecutionsPageSuite.scala | 50 ++++
.../execution/ui/MetricsAggregationBenchmark.scala | 1 +
.../execution/ui/SQLAppStatusListenerSuite.scala | 12 +-
.../spark/status/api/v1/sql/SqlResourceSuite.scala | 1 +
.../sql/KVStoreProtobufSerializerSuite.scala | 2 +
18 files changed, 319 insertions(+), 77 deletions(-)
diff --git
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index 04c857f7c3c..e9aaad261f9 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -417,6 +417,7 @@ message SQLExecutionUIData {
repeated int64 stages = 11;
bool metric_values_is_null = 12;
map<int64, string> metric_values = 13;
+ optional int64 root_execution_id = 14;
}
message SparkPlanGraphNode {
diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css
b/core/src/main/resources/org/apache/spark/ui/static/webui.css
index 0252bc80047..f952f86503e 100755
--- a/core/src/main/resources/org/apache/spark/ui/static/webui.css
+++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css
@@ -187,6 +187,18 @@ pre {
display: none;
}
+.sub-execution-list {
+ font-size: 0.9rem;
+}
+
+.sub-execution-list.collapsed {
+ display: none;
+}
+
+.table-striped .sub-execution-list table tr {
+ background-color: inherit;
+}
+
.description-input {
overflow: hidden;
text-overflow: ellipsis;
diff --git a/core/src/main/scala/org/apache/spark/internal/config/UI.scala
b/core/src/main/scala/org/apache/spark/internal/config/UI.scala
index d09620b8e34..a32e60de2a4 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/UI.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/UI.scala
@@ -229,4 +229,11 @@ private[spark] object UI {
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.createWithDefault("LOCAL")
+
+ val UI_SQL_GROUP_SUB_EXECUTION_ENABLED =
ConfigBuilder("spark.ui.groupSQLSubExecutionEnabled")
+ .doc("Whether to group sub executions together in SQL UI when they belong
to the same " +
+ "root execution")
+ .version("3.4.0")
+ .booleanConf
+ .createWithDefault(true)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index bb4cea474fe..90468b18a99 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -30,6 +30,7 @@ import org.apache.spark.util.Utils
object SQLExecution {
val EXECUTION_ID_KEY = "spark.sql.execution.id"
+ val EXECUTION_ROOT_ID_KEY = "spark.sql.execution.root.id"
private val _nextExecutionId = new AtomicLong(0)
@@ -67,6 +68,13 @@ object SQLExecution {
val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY)
val executionId = SQLExecution.nextExecutionId
sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)
+ // Track the "root" SQL Execution Id for nested/sub queries. The current
execution is the
+ // root execution if the root execution ID is null.
+ // And for the root execution, rootExecutionId == executionId.
+ if (sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) == null) {
+ sc.setLocalProperty(EXECUTION_ROOT_ID_KEY, executionId.toString)
+ }
+ val rootExecutionId = sc.getLocalProperty(EXECUTION_ROOT_ID_KEY).toLong
executionIdToQueryExecution.put(executionId, queryExecution)
try {
// sparkContext.getCallSite() would first try to pick up any call site
that was previously
@@ -98,6 +106,7 @@ object SQLExecution {
try {
sc.listenerBus.post(SparkListenerSQLExecutionStart(
executionId = executionId,
+ rootExecutionId = rootExecutionId,
description = desc,
details = callSite.longForm,
physicalPlanDescription =
queryExecution.explainString(planDescriptionMode),
@@ -140,6 +149,11 @@ object SQLExecution {
} finally {
executionIdToQueryExecution.remove(executionId)
sc.setLocalProperty(EXECUTION_ID_KEY, oldExecutionId)
+ // Unset the "root" SQL Execution Id once the "root" SQL execution
completes.
+ // The current execution is the root execution if rootExecutionId ==
executionId.
+ if (sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) == executionId.toString) {
+ sc.setLocalProperty(EXECUTION_ROOT_ID_KEY, null)
+ }
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
index a7adc9431c3..cd8f31b3c21 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
@@ -26,6 +26,7 @@ import scala.xml.{Node, NodeSeq}
import org.apache.spark.JobExecutionStatus
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.UI.UI_SQL_GROUP_SUB_EXECUTION_ENABLED
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils, WebUIPage}
import org.apache.spark.util.Utils
@@ -33,33 +34,55 @@ import org.apache.spark.util.Utils
private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with
Logging {
private val sqlStore = parent.sqlStore
+ private val groupSubExecutionEnabled =
parent.conf.get(UI_SQL_GROUP_SUB_EXECUTION_ENABLED)
override def render(request: HttpServletRequest): Seq[Node] = {
val currentTime = System.currentTimeMillis()
val running = new mutable.ArrayBuffer[SQLExecutionUIData]()
val completed = new mutable.ArrayBuffer[SQLExecutionUIData]()
val failed = new mutable.ArrayBuffer[SQLExecutionUIData]()
+ val executionIdToSubExecutions =
+ new mutable.HashMap[Long, mutable.ArrayBuffer[SQLExecutionUIData]]()
sqlStore.executionsList().foreach { e =>
- if (e.errorMessage.isDefined) {
- if (e.errorMessage.get.isEmpty) {
- completed += e
+ def processExecution(e: SQLExecutionUIData): Unit = {
+ if (e.errorMessage.isDefined) {
+ if (e.errorMessage.get.isEmpty) {
+ completed += e
+ } else {
+ failed += e
+ }
+ } else if (e.completionTime.isEmpty) {
+ running += e
} else {
- failed += e
+ // When `completionTime` is present, it means the query execution is
completed and
+ // `errorMessage` should be present as well. However, events
generated by old versions of
+ // Spark do not have the `errorMessage` field. We have to check the
status of this query
+ // execution's jobs.
+ val isFailed = e.jobs.exists { case (_, status) => status ==
JobExecutionStatus.FAILED }
+ if (isFailed) {
+ failed += e
+ } else {
+ completed += e
+ }
}
- } else if (e.completionTime.isEmpty) {
- running += e
+ }
+ // group the sub execution only if the root execution will be displayed
(i.e. not missing)
+ if (groupSubExecutionEnabled &&
+ e.executionId != e.rootExecutionId &&
+ executionIdToSubExecutions.contains(e.rootExecutionId)) {
+ executionIdToSubExecutions(e.rootExecutionId) += e
} else {
- // When `completionTime` is present, it means the query execution is
completed and
- // `errorMessage` should be present as well. However, events generated
by old versions of
- // Spark do not have the `errorMessage` field. We have to check the
status of this query
- // execution's jobs.
- val isFailed = e.jobs.exists { case (_, status) => status ==
JobExecutionStatus.FAILED }
- if (isFailed) {
- failed += e
- } else {
- completed += e
+ if (groupSubExecutionEnabled) {
+ // add the execution id to indicate it'll be displayed as root, so
the executions with
+ // the same root execution id will be added here and displayed as
sub execution.
+ // If the root execution is not found (e.g. event loss), then the
sub executions will
+ // be displayed in the root list instead.
+ // NOTE: this code assumes the root execution id always comes first.
which is guaranteed
+ // by the `sqlStore.executionsList()`
+ executionIdToSubExecutions(e.executionId) = new
mutable.ArrayBuffer[SQLExecutionUIData]()
}
+ processExecution(e)
}
}
@@ -68,7 +91,8 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends
WebUIPage("") with L
if (running.nonEmpty) {
val runningPageTable =
- executionsTable(request, "running", running.toSeq, currentTime,
true, true, true)
+ executionsTable(request, "running", running.toSeq,
+ executionIdToSubExecutions.mapValues(_.toSeq).toMap, currentTime,
true, true, true)
_content ++=
<span id="running" class="collapse-aggregated-runningExecutions
collapse-table"
@@ -86,7 +110,8 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends
WebUIPage("") with L
if (completed.nonEmpty) {
val completedPageTable =
- executionsTable(request, "completed", completed.toSeq, currentTime,
false, true, false)
+ executionsTable(request, "completed", completed.toSeq,
+ executionIdToSubExecutions.mapValues(_.toSeq).toMap, currentTime,
false, true, false)
_content ++=
<span id="completed" class="collapse-aggregated-completedExecutions
collapse-table"
@@ -104,7 +129,8 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends
WebUIPage("") with L
if (failed.nonEmpty) {
val failedPageTable =
- executionsTable(request, "failed", failed.toSeq, currentTime, false,
true, true)
+ executionsTable(request, "failed", failed.toSeq,
+ executionIdToSubExecutions.mapValues(_.toSeq).toMap, currentTime,
false, true, true)
_content ++=
<span id="failed" class="collapse-aggregated-failedExecutions
collapse-table"
@@ -164,6 +190,7 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends
WebUIPage("") with L
request: HttpServletRequest,
executionTag: String,
executionData: Seq[SQLExecutionUIData],
+ executionIdToSubExecutions: Map[Long, Seq[SQLExecutionUIData]],
currentTime: Long,
showRunningJobs: Boolean,
showSucceededJobs: Boolean,
@@ -186,7 +213,8 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends
WebUIPage("") with L
currentTime,
showRunningJobs,
showSucceededJobs,
- showFailedJobs).table(executionPage)
+ showFailedJobs,
+ executionIdToSubExecutions).table(executionPage)
} catch {
case e@(_: IllegalArgumentException | _: IndexOutOfBoundsException) =>
<div class="alert alert-error">
@@ -210,7 +238,9 @@ private[ui] class ExecutionPagedTable(
currentTime: Long,
showRunningJobs: Boolean,
showSucceededJobs: Boolean,
- showFailedJobs: Boolean) extends PagedTable[ExecutionTableRowData] {
+ showFailedJobs: Boolean,
+ subExecutions: Map[Long, Seq[SQLExecutionUIData]] = Map.empty)
+ extends PagedTable[ExecutionTableRowData] {
private val (sortColumn, desc, pageSize) = getTableParameters(request,
executionTag, "ID")
@@ -224,11 +254,14 @@ private[ui] class ExecutionPagedTable(
desc,
showRunningJobs,
showSucceededJobs,
- showFailedJobs)
+ showFailedJobs,
+ subExecutions)
private val parameterPath =
s"$basePath/$subPath/?${getParameterOtherTable(request, executionTag)}"
+ private val showSubExecutions = subExecutions.nonEmpty
+
override def tableId: String = s"$executionTag-table"
override def tableCssClass: String =
@@ -250,32 +283,39 @@ private[ui] class ExecutionPagedTable(
override def goButtonFormPath: String =
s"$parameterPath&$executionTag.sort=$encodedSortColumn&$executionTag.desc=$desc#$tableHeaderId"
- override def headers: Seq[Node] = {
- // Information for each header: title, sortable, tooltip
- val executionHeadersAndCssClasses: Seq[(String, Boolean, Option[String])] =
- Seq(
- ("ID", true, None),
- ("Description", true, None),
- ("Submitted", true, None),
- ("Duration", true, Some("Time from query submission to completion (or
if still executing," +
- "time since submission)"))) ++ {
- if (showRunningJobs && showSucceededJobs && showFailedJobs) {
- Seq(
- ("Running Job IDs", true, None),
- ("Succeeded Job IDs", true, None),
- ("Failed Job IDs", true, None))
- } else if (showSucceededJobs && showFailedJobs) {
- Seq(
- ("Succeeded Job IDs", true, None),
- ("Failed Job IDs", true, None))
- } else {
- Seq(("Job IDs", true, None))
- }
+ // Information for each header: title, sortable, tooltip
+ private val headerInfo: Seq[(String, Boolean, Option[String])] = {
+ Seq(
+ ("ID", true, None),
+ ("Description", true, None),
+ ("Submitted", true, None),
+ ("Duration", true, Some("Time from query submission to completion (or if
still executing," +
+ " time since submission)"))) ++ {
+ if (showRunningJobs && showSucceededJobs && showFailedJobs) {
+ Seq(
+ ("Running Job IDs", true, None),
+ ("Succeeded Job IDs", true, None),
+ ("Failed Job IDs", true, None))
+ } else if (showSucceededJobs && showFailedJobs) {
+ Seq(
+ ("Succeeded Job IDs", true, None),
+ ("Failed Job IDs", true, None))
+ } else {
+ Seq(("Job IDs", true, None))
}
+ } ++ {
+ if (showSubExecutions) {
+ Seq(("Sub Execution IDs", true, None))
+ } else {
+ Nil
+ }
+ }
+ }
- isSortColumnValid(executionHeadersAndCssClasses, sortColumn)
+ override def headers: Seq[Node] = {
+ isSortColumnValid(headerInfo, sortColumn)
- headerRow(executionHeadersAndCssClasses, desc, pageSize, sortColumn,
parameterPath,
+ headerRow(headerInfo, desc, pageSize, sortColumn, parameterPath,
executionTag, tableHeaderId)
}
@@ -290,35 +330,119 @@ private[ui] class ExecutionPagedTable(
}
}
- <tr>
- <td>
- {executionUIData.executionId.toString}
- </td>
- <td>
- {descriptionCell(executionUIData)}
- </td>
- <td sorttable_customkey={submissionTime.toString}>
- {UIUtils.formatDate(submissionTime)}
- </td>
- <td sorttable_customkey={duration.toString}>
- {UIUtils.formatDuration(duration)}
- </td>
- {if (showRunningJobs) {
+ def executionLinks(executionData: Seq[Long]): Seq[Node] = {
+ val details = if (executionData.nonEmpty) {
+ val onClickScript =
"this.parentNode.parentNode.nextElementSibling.nextElementSibling" +
+ ".classList.toggle('collapsed')"
+ <span onclick={onClickScript} class="expand-details">
+ +details
+ </span>
+ } else {
+ Nil
+ }
+
+ <div>
+ {
+ executionData.map { executionId =>
+ <a href={executionURL(executionId)}>[{executionId.toString}]</a>
+ }
+ }
+ </div> ++ details
+ }
+
+ val baseRow: Seq[Node] = {
+ <tr>
<td>
- {jobLinks(executionTableRow.runningJobData)}
+ {executionUIData.executionId.toString}
</td>
- }}
- {if (showSucceededJobs) {
<td>
- {jobLinks(executionTableRow.completedJobData)}
+ {descriptionCell(executionUIData)}
</td>
- }}
- {if (showFailedJobs) {
- <td>
- {jobLinks(executionTableRow.failedJobData)}
+ <td sorttable_customkey={submissionTime.toString}>
+ {UIUtils.formatDate(submissionTime)}
+ </td>
+ <td sorttable_customkey={duration.toString}>
+ {UIUtils.formatDuration(duration)}
+ </td>
+ {if (showRunningJobs) {
+ <td>
+ {jobLinks(executionTableRow.runningJobData)}
+ </td>
+ }}
+ {if (showSucceededJobs) {
+ <td>
+ {jobLinks(executionTableRow.completedJobData)}
+ </td>
+ }}
+ {if (showFailedJobs) {
+ <td>
+ {jobLinks(executionTableRow.failedJobData)}
+ </td>
+ }}
+ {if (showSubExecutions) {
+ <td>
+
{executionLinks(executionTableRow.subExecutionData.map(_.executionUIData.executionId))}
+ </td>
+ }}
+ </tr>
+ }
+
+ val subRow: Seq[Node] = if (executionTableRow.subExecutionData.nonEmpty) {
+ <tr></tr>
+ <tr class="sub-execution-list collapsed">
+ <td></td>
+ <td colspan={s"${headerInfo.length - 1}"}>
+ <table class="table table-bordered table-sm
table-cell-width-limited">
+ <thead>
+ <tr>
+ {headerInfo.dropRight(1).map(info => <th>{info._1}</th>)}
+ </tr>
+ </thead>
+ <tbody>
+ {
+ executionTableRow.subExecutionData.map { rowData =>
+ val executionUIData = rowData.executionUIData
+ val submissionTime = executionUIData.submissionTime
+ val duration = rowData.duration
+ <tr>
+ <td>
+ {executionUIData.executionId.toString}
+ </td>
+ <td>
+ {descriptionCell(executionUIData)}
+ </td>
+ <td sorttable_customkey={submissionTime.toString}>
+ {UIUtils.formatDate(submissionTime)}
+ </td>
+ <td sorttable_customkey={duration.toString}>
+ {UIUtils.formatDuration(duration)}
+ </td>
+ {if (showRunningJobs) {
+ <td>
+ {jobLinks(rowData.runningJobData)}
+ </td>
+ }}
+ {if (showSucceededJobs) {
+ <td>
+ {jobLinks(rowData.completedJobData)}
+ </td>
+ }}
+ {if (showFailedJobs) {
+ <td>
+ {jobLinks(rowData.failedJobData)}
+ </td>
+ }}
+ </tr>
+ }
+ }
+ </tbody>
+ </table>
</td>
- }}
- </tr>
+ </tr>
+ } else {
+ Nil
+ }
+ baseRow ++ subRow
}
private def descriptionCell(execution: SQLExecutionUIData): Seq[Node] = {
@@ -358,7 +482,8 @@ private[ui] class ExecutionTableRowData(
val executionUIData: SQLExecutionUIData,
val runningJobData: Seq[Int],
val completedJobData: Seq[Int],
- val failedJobData: Seq[Int])
+ val failedJobData: Seq[Int],
+ val subExecutionData: Seq[ExecutionTableRowData])
private[ui] class ExecutionDataSource(
@@ -369,7 +494,9 @@ private[ui] class ExecutionDataSource(
desc: Boolean,
showRunningJobs: Boolean,
showSucceededJobs: Boolean,
- showFailedJobs: Boolean) extends
PagedDataSource[ExecutionTableRowData](pageSize) {
+ showFailedJobs: Boolean,
+ subExecutions: Map[Long, Seq[SQLExecutionUIData]])
+ extends PagedDataSource[ExecutionTableRowData](pageSize) {
// Convert ExecutionData to ExecutionTableRowData which contains the final
contents to show
// in the table so that we can avoid creating duplicate contents during
sorting the data
@@ -401,12 +528,18 @@ private[ui] class ExecutionDataSource(
}.map { case (jobId, _) => jobId }.toSeq.sorted
} else Seq.empty
+ val executions = subExecutions.get(executionUIData.executionId) match {
+ case Some(executions) => executions.map(executionRow)
+ case _ => Seq.empty
+ }
+
new ExecutionTableRowData(
duration,
executionUIData,
runningJobData,
completedJobData,
- failedJobData)
+ failedJobData,
+ executions)
}
/** Return Ordering according to sortColumn and desc. */
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
index 64c652542ff..32b215b1c2e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
@@ -90,6 +90,7 @@ class SQLAppStatusListener(
// data corresponding to the execId.
val sqlStoreData = kvstore.read(classOf[SQLExecutionUIData],
executionId)
val executionData = new LiveExecutionData(executionId)
+ executionData.rootExecutionId = sqlStoreData.rootExecutionId
executionData.description = sqlStoreData.description
executionData.details = sqlStoreData.details
executionData.physicalPlanDescription =
sqlStoreData.physicalPlanDescription
@@ -340,7 +341,7 @@ class SQLAppStatusListener(
}
private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
- val SparkListenerSQLExecutionStart(executionId, description, details,
+ val SparkListenerSQLExecutionStart(executionId, rootExecutionId,
description, details,
physicalPlanDescription, sparkPlanInfo, time, modifiedConfigs) = event
val planGraph = SparkPlanGraph(sparkPlanInfo)
@@ -355,6 +356,7 @@ class SQLAppStatusListener(
kvstore.write(graphToStore)
val exec = getOrCreateExecution(executionId)
+ exec.rootExecutionId = rootExecutionId
exec.description = description
exec.details = details
exec.physicalPlanDescription = physicalPlanDescription
@@ -483,6 +485,7 @@ class SQLAppStatusListener(
private class LiveExecutionData(val executionId: Long) extends LiveEntity {
+ var rootExecutionId: Long = _
var description: String = null
var details: String = null
var physicalPlanDescription: String = null
@@ -505,6 +508,7 @@ private class LiveExecutionData(val executionId: Long)
extends LiveEntity {
override protected def doUpdate(): Any = {
new SQLExecutionUIData(
executionId,
+ rootExecutionId,
description,
details,
physicalPlanDescription,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
index 46827aaa1d9..6c92b98ca3d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
@@ -83,6 +83,7 @@ class SQLAppStatusStore(
class SQLExecutionUIData(
@KVIndexParam val executionId: Long,
+ val rootExecutionId: Long,
val description: String,
val details: String,
val physicalPlanDescription: String,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index ce665e11893..b931b4fcde1 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -43,6 +43,8 @@ case class SparkListenerSQLAdaptiveSQLMetricUpdates(
@DeveloperApi
case class SparkListenerSQLExecutionStart(
executionId: Long,
+ // if the execution is a root, then rootExecutionId == executionId
+ rootExecutionId: Long,
description: String,
details: String,
physicalPlanDescription: String,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
index 8a258fb1219..5aa6ddbdb7a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
@@ -17,12 +17,15 @@
package org.apache.spark.sql.execution.ui
+import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.ui.{SparkUI, SparkUITab}
class SQLTab(val sqlStore: SQLAppStatusStore, sparkUI: SparkUI)
extends SparkUITab(sparkUI, "SQL") with Logging {
+ def conf: SparkConf = sparkUI.conf
+
override val name = "SQL / DataFrame"
val parent = sparkUI
diff --git
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
index 09cef9663c0..1ccaf5c68c9 100644
---
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
+++
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
@@ -80,6 +80,7 @@ class SQLExecutionUIDataSerializer extends ProtobufSerDe {
new SQLExecutionUIData(
executionId = ui.getExecutionId,
+ rootExecutionId = ui.getRootExecutionId,
description = ui.getDescription,
details = ui.getDetails,
physicalPlanDescription = ui.getPhysicalPlanDescription,
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala
index 9f2b08a65ea..e9d98ee9715 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala
@@ -59,7 +59,7 @@ class SQLJsonProtocolSuite extends SparkFunSuite with
LocalSparkSession {
val reconstructedEvent =
JsonProtocol.sparkEventFromJson(SQLExecutionStartJsonString)
if (newExecutionStartEvent) {
- val expectedEvent = SparkListenerSQLExecutionStart(0, "test desc",
"test detail",
+ val expectedEvent = SparkListenerSQLExecutionStart(0, 0, "test desc",
"test detail",
"test plan", new SparkPlanInfo("TestNode", "test string", Nil,
Map(), Nil), 0,
Map("k1" -> "v1"))
assert(reconstructedEvent == expectedEvent)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala
index 090c149886a..42b27bd9f28 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala
@@ -57,7 +57,7 @@ class SQLEventFilterBuilderSuite extends SparkFunSuite {
}
// Start SQL Execution
- listener.onOtherEvent(SparkListenerSQLExecutionStart(1, "desc1",
"details1", "plan",
+ listener.onOtherEvent(SparkListenerSQLExecutionStart(1, 1, "desc1",
"details1", "plan",
new SparkPlanInfo("node", "str", Seq.empty, Map.empty, Seq.empty), time,
Map.empty))
time += 1
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala
index 724df8ebe8b..f1b77e502df 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala
@@ -41,7 +41,7 @@ class SQLLiveEntitiesEventFilterSuite extends SparkFunSuite {
val acceptFn = filter.acceptFn().lift
// Verifying with finished SQL execution 1
- assert(Some(false) === acceptFn(SparkListenerSQLExecutionStart(1,
"description1", "details1",
+ assert(Some(false) === acceptFn(SparkListenerSQLExecutionStart(1, 1,
"description1", "details1",
"plan", null, 0, Map.empty)))
assert(Some(false) === acceptFn(SparkListenerSQLExecutionEnd(1, 0)))
assert(Some(false) === acceptFn(SparkListenerSQLAdaptiveExecutionUpdate(1,
"plan", null)))
@@ -88,7 +88,7 @@ class SQLLiveEntitiesEventFilterSuite extends SparkFunSuite {
}
// Verifying with live SQL execution 2
- assert(Some(true) === acceptFn(SparkListenerSQLExecutionStart(2,
"description2", "details2",
+ assert(Some(true) === acceptFn(SparkListenerSQLExecutionStart(2, 2,
"description2", "details2",
"plan", null, 0, Map.empty)))
assert(Some(true) === acceptFn(SparkListenerSQLExecutionEnd(2, 0)))
assert(Some(true) === acceptFn(SparkListenerSQLAdaptiveExecutionUpdate(2,
"plan", null)))
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala
index a5368711260..7af58867f33 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala
@@ -28,6 +28,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.SparkConf
import org.apache.spark.internal.config.Status.LIVE_UI_LOCAL_STORE_DIR
+import org.apache.spark.internal.config.UI.UI_SQL_GROUP_SUB_EXECUTION_ENABLED
import org.apache.spark.scheduler.{JobFailed, SparkListenerJobEnd,
SparkListenerJobStart}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution}
@@ -60,6 +61,7 @@ abstract class AllExecutionsPageSuite extends
SharedSparkSession with BeforeAndA
when(tab.sqlStore).thenReturn(statusStore)
val request = mock(classOf[HttpServletRequest])
+ when(tab.conf).thenReturn(new SparkConf(false))
when(tab.appName).thenReturn("testing")
when(tab.headerTabs).thenReturn(Seq.empty)
@@ -74,6 +76,7 @@ abstract class AllExecutionsPageSuite extends
SharedSparkSession with BeforeAndA
when(tab.sqlStore).thenReturn(statusStore)
val request = mock(classOf[HttpServletRequest])
+ when(tab.conf).thenReturn(new SparkConf(false))
when(tab.appName).thenReturn("testing")
when(tab.headerTabs).thenReturn(Seq.empty)
@@ -82,6 +85,7 @@ abstract class AllExecutionsPageSuite extends
SharedSparkSession with BeforeAndA
val page = new AllExecutionsPage(tab)
val df = createTestDataFrame
listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ 0,
0,
"test",
"test",
@@ -109,6 +113,7 @@ abstract class AllExecutionsPageSuite extends
SharedSparkSession with BeforeAndA
val tab = mock(classOf[SQLTab], RETURNS_SMART_NULLS)
val request = mock(classOf[HttpServletRequest])
+ when(tab.conf).thenReturn(new SparkConf(false))
when(tab.sqlStore).thenReturn(statusStore)
when(tab.appName).thenReturn("testing")
when(tab.headerTabs).thenReturn(Seq.empty)
@@ -121,6 +126,50 @@ abstract class AllExecutionsPageSuite extends
SharedSparkSession with BeforeAndA
assert(html.contains("duration"))
}
+ test("group sub executions") {
+ val statusStore = createStatusStore
+ val tab = mock(classOf[SQLTab], RETURNS_SMART_NULLS)
+ val request = mock(classOf[HttpServletRequest])
+
+ val sparkConf = new
SparkConf(false).set(UI_SQL_GROUP_SUB_EXECUTION_ENABLED, true)
+ when(tab.conf).thenReturn(sparkConf)
+ when(tab.sqlStore).thenReturn(statusStore)
+ when(tab.appName).thenReturn("testing")
+ when(tab.headerTabs).thenReturn(Seq.empty)
+
+ val listener = statusStore.listener.get
+ val page = new AllExecutionsPage(tab)
+ val df = createTestDataFrame
+ listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ 0,
+ 0,
+ "test",
+ "test",
+ df.queryExecution.toString,
+ SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+ System.currentTimeMillis()))
+ listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ 1,
+ 0,
+ "test",
+ "test",
+ df.queryExecution.toString,
+ SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+ System.currentTimeMillis()))
+ // sub execution has a missing root execution
+ listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ 2,
+ 100,
+ "test",
+ "test",
+ df.queryExecution.toString,
+ SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+ System.currentTimeMillis()))
+ val html = page.render(request).toString().toLowerCase(Locale.ROOT)
+ assert(html.contains("sub execution ids") &&
html.contains("sub-execution-list"))
+ // sub execution should still be displayed if the root execution is missing
+ assert(html.contains("id=2"))
+ }
protected def createStatusStore: SQLAppStatusStore
@@ -146,6 +195,7 @@ abstract class AllExecutionsPageSuite extends
SharedSparkSession with BeforeAndA
Seq(0, 1).foreach { executionId =>
val df = createTestDataFrame
listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ executionId,
executionId,
"test",
"test",
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala
index d994126fe63..3b9efb18057 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala
@@ -74,6 +74,7 @@ object MetricsAggregationBenchmark extends BenchmarkBase {
val idgen = new AtomicInteger()
val executionId = idgen.incrementAndGet()
val executionStart = SparkListenerSQLExecutionStart(
+ executionId,
executionId,
getClass().getName(),
getClass().getName(),
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index 3d80935fdda..26bb35cf0e5 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -190,6 +190,7 @@ abstract class SQLAppStatusListenerSuite extends
SharedSparkSession with JsonTes
}.toMap
listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ executionId,
executionId,
"test",
"test",
@@ -343,7 +344,7 @@ abstract class SQLAppStatusListenerSuite extends
SharedSparkSession with JsonTes
val listener = new SparkListener {
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
- case SparkListenerSQLExecutionStart(_, _, _, planDescription, _,
_, _) =>
+ case SparkListenerSQLExecutionStart(_, _, _, _, planDescription,
_, _, _) =>
assert(expected.forall(planDescription.contains))
checkDone = true
case _ => // ignore other events
@@ -380,6 +381,7 @@ abstract class SQLAppStatusListenerSuite extends
SharedSparkSession with JsonTes
val executionId = 0
val df = createTestDataFrame
listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ executionId,
executionId,
"test",
"test",
@@ -410,6 +412,7 @@ abstract class SQLAppStatusListenerSuite extends
SharedSparkSession with JsonTes
val executionId = 0
val df = createTestDataFrame
listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ executionId,
executionId,
"test",
"test",
@@ -451,6 +454,7 @@ abstract class SQLAppStatusListenerSuite extends
SharedSparkSession with JsonTes
val executionId = 0
val df = createTestDataFrame
listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ executionId,
executionId,
"test",
"test",
@@ -481,6 +485,7 @@ abstract class SQLAppStatusListenerSuite extends
SharedSparkSession with JsonTes
val executionId = 0
val df = createTestDataFrame
listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ executionId,
executionId,
"test",
"test",
@@ -512,6 +517,7 @@ abstract class SQLAppStatusListenerSuite extends
SharedSparkSession with JsonTes
val executionId = 0
val df = createTestDataFrame
listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ executionId,
executionId,
"test",
"test",
@@ -652,6 +658,7 @@ abstract class SQLAppStatusListenerSuite extends
SharedSparkSession with JsonTes
// Start execution 1 and execution 2
time += 1
listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ 1,
1,
"test",
"test",
@@ -661,6 +668,7 @@ abstract class SQLAppStatusListenerSuite extends
SharedSparkSession with JsonTes
Map.empty))
time += 1
listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ 2,
2,
"test",
"test",
@@ -678,6 +686,7 @@ abstract class SQLAppStatusListenerSuite extends
SharedSparkSession with JsonTes
// Start execution 3 and execution 2 should be evicted.
time += 1
listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ 3,
3,
"test",
"test",
@@ -714,6 +723,7 @@ abstract class SQLAppStatusListenerSuite extends
SharedSparkSession with JsonTes
.allNodes.flatMap(_.metrics.map(_.accumulatorId))
listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ executionId,
executionId,
"test",
"test",
diff --git
a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala
index f6fd1fc42ce..a0154d724da 100644
---
a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala
@@ -82,6 +82,7 @@ object SqlResourceSuite {
new SQLExecutionUIData(
executionId = 0,
+ rootExecutionId = 1,
description = DESCRIPTION,
details = "",
physicalPlanDescription = PLAN_DESCRIPTION,
diff --git
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
index f7b783ef3ca..ddc693f1ee3 100644
---
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
@@ -52,6 +52,7 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite {
val input1 = new SQLExecutionUIData(
executionId = templateData.executionId,
+ rootExecutionId = templateData.rootExecutionId,
description = templateData.description,
details = templateData.details,
physicalPlanDescription = templateData.physicalPlanDescription,
@@ -71,6 +72,7 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite {
val input2 = new SQLExecutionUIData(
executionId = templateData.executionId,
+ rootExecutionId = templateData.rootExecutionId,
description = templateData.description,
details = templateData.details,
physicalPlanDescription = templateData.physicalPlanDescription,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]