This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c124037b975 [SPARK-41752][SQL][UI] Group nested executions under the 
root execution
c124037b975 is described below

commit c124037b97538b2656d29ce547b2a42209a41703
Author: Linhong Liu <[email protected]>
AuthorDate: Tue Jan 10 20:52:15 2023 +0800

    [SPARK-41752][SQL][UI] Group nested executions under the root execution
    
    ### What changes were proposed in this pull request?
    This PR proposes to group all sub-executions together in SQL UI if they 
belong to the same root execution.
    
    This feature is controlled by conf `spark.ui.sql.groupSubExecutionEnabled` 
and the default value is set to `true`
    
    We can have some follow-up improvements after this PR:
    1. Add links to SQL page and Job page to indicate the root execution ID.
    2. Better handling for the root execution missing case (e.g. eviction due 
to retaining limit). In this PR, the sub-executions will be displayed ungrouped.
    
    ### Why are the changes needed?
    better user experience.
    
    In PR #39220, the CTAS query will trigger a sub-execution to perform the 
data insertion. But the current UI will display the two executions separately 
which may confuse the users.
    In addition, this change should also help the structured streaming cases
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, the screenshot of the UI change is shown below
    SQL Query:
    ```
    CREATE TABLE t USING PARQUET AS SELECT 'a' as a, 1 as b
    ```
    UI before this PR
    <img width="1074" alt="Screen Shot 2022-12-28 at 4 42 08 PM" 
src="https://user-images.githubusercontent.com/67896261/209889679-83909bc9-0e15-4ff1-9aeb-3118e4bab524.png";>
    
    UI after this PR with sub executions collapsed
    <img width="1072" alt="Screen Shot 2022-12-28 at 4 44 32 PM" 
src="https://user-images.githubusercontent.com/67896261/209889688-973a4ec9-a5dc-4a8b-8618-c0800733fffa.png";>
    
    UI after this PR with sub execution expanded
    <img width="1069" alt="Screen Shot 2022-12-28 at 4 44 41 PM" 
src="https://user-images.githubusercontent.com/67896261/209889718-0e24be12-23d6-4f81-a508-15eac62ec231.png";>
    
    ### How was this patch tested?
    UT
    
    Closes #39268 from linhongliu-db/SPARK-41752.
    
    Authored-by: Linhong Liu <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../apache/spark/status/protobuf/store_types.proto |   1 +
 .../resources/org/apache/spark/ui/static/webui.css |  12 +
 .../org/apache/spark/internal/config/UI.scala      |   7 +
 .../apache/spark/sql/execution/SQLExecution.scala  |  14 ++
 .../spark/sql/execution/ui/AllExecutionsPage.scala | 275 +++++++++++++++------
 .../sql/execution/ui/SQLAppStatusListener.scala    |   6 +-
 .../spark/sql/execution/ui/SQLAppStatusStore.scala |   1 +
 .../spark/sql/execution/ui/SQLListener.scala       |   2 +
 .../org/apache/spark/sql/execution/ui/SQLTab.scala |   3 +
 .../sql/SQLExecutionUIDataSerializer.scala         |   1 +
 .../spark/sql/execution/SQLJsonProtocolSuite.scala |   2 +-
 .../history/SQLEventFilterBuilderSuite.scala       |   2 +-
 .../history/SQLLiveEntitiesEventFilterSuite.scala  |   4 +-
 .../sql/execution/ui/AllExecutionsPageSuite.scala  |  50 ++++
 .../execution/ui/MetricsAggregationBenchmark.scala |   1 +
 .../execution/ui/SQLAppStatusListenerSuite.scala   |  12 +-
 .../spark/status/api/v1/sql/SqlResourceSuite.scala |   1 +
 .../sql/KVStoreProtobufSerializerSuite.scala       |   2 +
 18 files changed, 319 insertions(+), 77 deletions(-)

diff --git 
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto 
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index 04c857f7c3c..e9aaad261f9 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -417,6 +417,7 @@ message SQLExecutionUIData {
   repeated int64 stages = 11;
   bool metric_values_is_null = 12;
   map<int64, string> metric_values = 13;
+  optional int64 root_execution_id = 14;
 }
 
 message SparkPlanGraphNode {
diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css 
b/core/src/main/resources/org/apache/spark/ui/static/webui.css
index 0252bc80047..f952f86503e 100755
--- a/core/src/main/resources/org/apache/spark/ui/static/webui.css
+++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css
@@ -187,6 +187,18 @@ pre {
   display: none;
 }
 
+.sub-execution-list {
+  font-size: 0.9rem;
+}
+
+.sub-execution-list.collapsed {
+  display: none;
+}
+
+.table-striped .sub-execution-list table tr {
+  background-color: inherit;
+}
+
 .description-input {
   overflow: hidden;
   text-overflow: ellipsis;
diff --git a/core/src/main/scala/org/apache/spark/internal/config/UI.scala 
b/core/src/main/scala/org/apache/spark/internal/config/UI.scala
index d09620b8e34..a32e60de2a4 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/UI.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/UI.scala
@@ -229,4 +229,11 @@ private[spark] object UI {
     .stringConf
     .transform(_.toUpperCase(Locale.ROOT))
     .createWithDefault("LOCAL")
+
+  val UI_SQL_GROUP_SUB_EXECUTION_ENABLED = 
ConfigBuilder("spark.ui.groupSQLSubExecutionEnabled")
+    .doc("Whether to group sub executions together in SQL UI when they belong 
to the same " +
+      "root execution")
+    .version("3.4.0")
+    .booleanConf
+    .createWithDefault(true)
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index bb4cea474fe..90468b18a99 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -30,6 +30,7 @@ import org.apache.spark.util.Utils
 object SQLExecution {
 
   val EXECUTION_ID_KEY = "spark.sql.execution.id"
+  val EXECUTION_ROOT_ID_KEY = "spark.sql.execution.root.id"
 
   private val _nextExecutionId = new AtomicLong(0)
 
@@ -67,6 +68,13 @@ object SQLExecution {
     val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY)
     val executionId = SQLExecution.nextExecutionId
     sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)
+    // Track the "root" SQL Execution Id for nested/sub queries. The current 
execution is the
+    // root execution if the root execution ID is null.
+    // And for the root execution, rootExecutionId == executionId.
+    if (sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) == null) {
+      sc.setLocalProperty(EXECUTION_ROOT_ID_KEY, executionId.toString)
+    }
+    val rootExecutionId = sc.getLocalProperty(EXECUTION_ROOT_ID_KEY).toLong
     executionIdToQueryExecution.put(executionId, queryExecution)
     try {
       // sparkContext.getCallSite() would first try to pick up any call site 
that was previously
@@ -98,6 +106,7 @@ object SQLExecution {
         try {
           sc.listenerBus.post(SparkListenerSQLExecutionStart(
             executionId = executionId,
+            rootExecutionId = rootExecutionId,
             description = desc,
             details = callSite.longForm,
             physicalPlanDescription = 
queryExecution.explainString(planDescriptionMode),
@@ -140,6 +149,11 @@ object SQLExecution {
     } finally {
       executionIdToQueryExecution.remove(executionId)
       sc.setLocalProperty(EXECUTION_ID_KEY, oldExecutionId)
+      // Unset the "root" SQL Execution Id once the "root" SQL execution 
completes.
+      // The current execution is the root execution if rootExecutionId == 
executionId.
+      if (sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) == executionId.toString) {
+        sc.setLocalProperty(EXECUTION_ROOT_ID_KEY, null)
+      }
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
index a7adc9431c3..cd8f31b3c21 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
@@ -26,6 +26,7 @@ import scala.xml.{Node, NodeSeq}
 
 import org.apache.spark.JobExecutionStatus
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.UI.UI_SQL_GROUP_SUB_EXECUTION_ENABLED
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils, WebUIPage}
 import org.apache.spark.util.Utils
@@ -33,33 +34,55 @@ import org.apache.spark.util.Utils
 private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with 
Logging {
 
   private val sqlStore = parent.sqlStore
+  private val groupSubExecutionEnabled = 
parent.conf.get(UI_SQL_GROUP_SUB_EXECUTION_ENABLED)
 
   override def render(request: HttpServletRequest): Seq[Node] = {
     val currentTime = System.currentTimeMillis()
     val running = new mutable.ArrayBuffer[SQLExecutionUIData]()
     val completed = new mutable.ArrayBuffer[SQLExecutionUIData]()
     val failed = new mutable.ArrayBuffer[SQLExecutionUIData]()
+    val executionIdToSubExecutions =
+      new mutable.HashMap[Long, mutable.ArrayBuffer[SQLExecutionUIData]]()
 
     sqlStore.executionsList().foreach { e =>
-      if (e.errorMessage.isDefined) {
-        if (e.errorMessage.get.isEmpty) {
-          completed += e
+      def processExecution(e: SQLExecutionUIData): Unit = {
+        if (e.errorMessage.isDefined) {
+          if (e.errorMessage.get.isEmpty) {
+            completed += e
+          } else {
+            failed += e
+          }
+        } else if (e.completionTime.isEmpty) {
+          running += e
         } else {
-          failed += e
+          // When `completionTime` is present, it means the query execution is 
completed and
+          // `errorMessage` should be present as well. However, events 
generated by old versions of
+          // Spark do not have the `errorMessage` field. We have to check the 
status of this query
+          // execution's jobs.
+          val isFailed = e.jobs.exists { case (_, status) => status == 
JobExecutionStatus.FAILED }
+          if (isFailed) {
+            failed += e
+          } else {
+            completed += e
+          }
         }
-      } else if (e.completionTime.isEmpty) {
-        running += e
+      }
+      // group the sub execution only if the root execution will be displayed 
(i.e. not missing)
+      if (groupSubExecutionEnabled &&
+          e.executionId != e.rootExecutionId &&
+          executionIdToSubExecutions.contains(e.rootExecutionId)) {
+        executionIdToSubExecutions(e.rootExecutionId) += e
       } else {
-        // When `completionTime` is present, it means the query execution is 
completed and
-        // `errorMessage` should be present as well. However, events generated 
by old versions of
-        // Spark do not have the `errorMessage` field. We have to check the 
status of this query
-        // execution's jobs.
-        val isFailed = e.jobs.exists { case (_, status) => status == 
JobExecutionStatus.FAILED }
-        if (isFailed) {
-          failed += e
-        } else {
-          completed += e
+        if (groupSubExecutionEnabled) {
+          // add the execution id to indicate it'll be displayed as root, so 
the executions with
+          // the same root execution id will be added here and displayed as 
sub execution.
+          // If the root execution is not found (e.g. event loss), then the 
sub executions will
+          // be displayed in the root list instead.
+          // NOTE: this code assumes the root execution id always comes first. 
which is guaranteed
+          // by the `sqlStore.executionsList()`
+          executionIdToSubExecutions(e.executionId) = new 
mutable.ArrayBuffer[SQLExecutionUIData]()
         }
+        processExecution(e)
       }
     }
 
@@ -68,7 +91,8 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends 
WebUIPage("") with L
 
       if (running.nonEmpty) {
         val runningPageTable =
-          executionsTable(request, "running", running.toSeq, currentTime, 
true, true, true)
+          executionsTable(request, "running", running.toSeq,
+            executionIdToSubExecutions.mapValues(_.toSeq).toMap, currentTime, 
true, true, true)
 
         _content ++=
           <span id="running" class="collapse-aggregated-runningExecutions 
collapse-table"
@@ -86,7 +110,8 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends 
WebUIPage("") with L
 
       if (completed.nonEmpty) {
         val completedPageTable =
-          executionsTable(request, "completed", completed.toSeq, currentTime, 
false, true, false)
+          executionsTable(request, "completed", completed.toSeq,
+            executionIdToSubExecutions.mapValues(_.toSeq).toMap, currentTime, 
false, true, false)
 
         _content ++=
           <span id="completed" class="collapse-aggregated-completedExecutions 
collapse-table"
@@ -104,7 +129,8 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends 
WebUIPage("") with L
 
       if (failed.nonEmpty) {
         val failedPageTable =
-          executionsTable(request, "failed", failed.toSeq, currentTime, false, 
true, true)
+          executionsTable(request, "failed", failed.toSeq,
+            executionIdToSubExecutions.mapValues(_.toSeq).toMap, currentTime, 
false, true, true)
 
         _content ++=
           <span id="failed" class="collapse-aggregated-failedExecutions 
collapse-table"
@@ -164,6 +190,7 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends 
WebUIPage("") with L
     request: HttpServletRequest,
     executionTag: String,
     executionData: Seq[SQLExecutionUIData],
+    executionIdToSubExecutions: Map[Long, Seq[SQLExecutionUIData]],
     currentTime: Long,
     showRunningJobs: Boolean,
     showSucceededJobs: Boolean,
@@ -186,7 +213,8 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends 
WebUIPage("") with L
         currentTime,
         showRunningJobs,
         showSucceededJobs,
-        showFailedJobs).table(executionPage)
+        showFailedJobs,
+        executionIdToSubExecutions).table(executionPage)
     } catch {
       case e@(_: IllegalArgumentException | _: IndexOutOfBoundsException) =>
         <div class="alert alert-error">
@@ -210,7 +238,9 @@ private[ui] class ExecutionPagedTable(
     currentTime: Long,
     showRunningJobs: Boolean,
     showSucceededJobs: Boolean,
-    showFailedJobs: Boolean) extends PagedTable[ExecutionTableRowData] {
+    showFailedJobs: Boolean,
+    subExecutions: Map[Long, Seq[SQLExecutionUIData]] = Map.empty)
+  extends PagedTable[ExecutionTableRowData] {
 
   private val (sortColumn, desc, pageSize) = getTableParameters(request, 
executionTag, "ID")
 
@@ -224,11 +254,14 @@ private[ui] class ExecutionPagedTable(
     desc,
     showRunningJobs,
     showSucceededJobs,
-    showFailedJobs)
+    showFailedJobs,
+    subExecutions)
 
   private val parameterPath =
     s"$basePath/$subPath/?${getParameterOtherTable(request, executionTag)}"
 
+  private val showSubExecutions = subExecutions.nonEmpty
+
   override def tableId: String = s"$executionTag-table"
 
   override def tableCssClass: String =
@@ -250,32 +283,39 @@ private[ui] class ExecutionPagedTable(
   override def goButtonFormPath: String =
     
s"$parameterPath&$executionTag.sort=$encodedSortColumn&$executionTag.desc=$desc#$tableHeaderId"
 
-  override def headers: Seq[Node] = {
-    // Information for each header: title, sortable, tooltip
-    val executionHeadersAndCssClasses: Seq[(String, Boolean, Option[String])] =
-      Seq(
-        ("ID", true, None),
-        ("Description", true, None),
-        ("Submitted", true, None),
-        ("Duration", true, Some("Time from query submission to completion (or 
if still executing," +
-          "time since submission)"))) ++ {
-        if (showRunningJobs && showSucceededJobs && showFailedJobs) {
-          Seq(
-            ("Running Job IDs", true, None),
-            ("Succeeded Job IDs", true, None),
-            ("Failed Job IDs", true, None))
-        } else if (showSucceededJobs && showFailedJobs) {
-          Seq(
-            ("Succeeded Job IDs", true, None),
-            ("Failed Job IDs", true, None))
-        } else {
-          Seq(("Job IDs", true, None))
-        }
+  // Information for each header: title, sortable, tooltip
+  private val headerInfo: Seq[(String, Boolean, Option[String])] = {
+    Seq(
+      ("ID", true, None),
+      ("Description", true, None),
+      ("Submitted", true, None),
+      ("Duration", true, Some("Time from query submission to completion (or if 
still executing," +
+        " time since submission)"))) ++ {
+      if (showRunningJobs && showSucceededJobs && showFailedJobs) {
+        Seq(
+          ("Running Job IDs", true, None),
+          ("Succeeded Job IDs", true, None),
+          ("Failed Job IDs", true, None))
+      } else if (showSucceededJobs && showFailedJobs) {
+        Seq(
+          ("Succeeded Job IDs", true, None),
+          ("Failed Job IDs", true, None))
+      } else {
+        Seq(("Job IDs", true, None))
       }
+    } ++ {
+      if (showSubExecutions) {
+        Seq(("Sub Execution IDs", true, None))
+      } else {
+        Nil
+      }
+    }
+  }
 
-    isSortColumnValid(executionHeadersAndCssClasses, sortColumn)
+  override def headers: Seq[Node] = {
+    isSortColumnValid(headerInfo, sortColumn)
 
-    headerRow(executionHeadersAndCssClasses, desc, pageSize, sortColumn, 
parameterPath,
+    headerRow(headerInfo, desc, pageSize, sortColumn, parameterPath,
       executionTag, tableHeaderId)
   }
 
@@ -290,35 +330,119 @@ private[ui] class ExecutionPagedTable(
       }
     }
 
-    <tr>
-      <td>
-        {executionUIData.executionId.toString}
-      </td>
-      <td>
-        {descriptionCell(executionUIData)}
-      </td>
-      <td sorttable_customkey={submissionTime.toString}>
-        {UIUtils.formatDate(submissionTime)}
-      </td>
-      <td sorttable_customkey={duration.toString}>
-        {UIUtils.formatDuration(duration)}
-      </td>
-      {if (showRunningJobs) {
+    def executionLinks(executionData: Seq[Long]): Seq[Node] = {
+      val details = if (executionData.nonEmpty) {
+        val onClickScript = 
"this.parentNode.parentNode.nextElementSibling.nextElementSibling" +
+          ".classList.toggle('collapsed')"
+        <span onclick={onClickScript} class="expand-details">
+          +details
+        </span>
+      } else {
+        Nil
+      }
+
+      <div>
+        {
+          executionData.map { executionId =>
+            <a href={executionURL(executionId)}>[{executionId.toString}]</a>
+          }
+        }
+      </div> ++ details
+    }
+
+    val baseRow: Seq[Node] = {
+      <tr>
         <td>
-          {jobLinks(executionTableRow.runningJobData)}
+          {executionUIData.executionId.toString}
         </td>
-      }}
-      {if (showSucceededJobs) {
         <td>
-          {jobLinks(executionTableRow.completedJobData)}
+          {descriptionCell(executionUIData)}
         </td>
-      }}
-      {if (showFailedJobs) {
-        <td>
-          {jobLinks(executionTableRow.failedJobData)}
+        <td sorttable_customkey={submissionTime.toString}>
+          {UIUtils.formatDate(submissionTime)}
+        </td>
+        <td sorttable_customkey={duration.toString}>
+          {UIUtils.formatDuration(duration)}
+        </td>
+        {if (showRunningJobs) {
+          <td>
+            {jobLinks(executionTableRow.runningJobData)}
+          </td>
+        }}
+        {if (showSucceededJobs) {
+          <td>
+            {jobLinks(executionTableRow.completedJobData)}
+          </td>
+        }}
+        {if (showFailedJobs) {
+          <td>
+            {jobLinks(executionTableRow.failedJobData)}
+          </td>
+        }}
+        {if (showSubExecutions) {
+          <td>
+            
{executionLinks(executionTableRow.subExecutionData.map(_.executionUIData.executionId))}
+          </td>
+        }}
+      </tr>
+    }
+
+    val subRow: Seq[Node] = if (executionTableRow.subExecutionData.nonEmpty) {
+      <tr></tr>
+      <tr class="sub-execution-list collapsed">
+        <td></td>
+        <td colspan={s"${headerInfo.length - 1}"}>
+          <table class="table table-bordered table-sm 
table-cell-width-limited">
+            <thead>
+              <tr>
+                {headerInfo.dropRight(1).map(info => <th>{info._1}</th>)}
+              </tr>
+            </thead>
+            <tbody>
+              {
+                executionTableRow.subExecutionData.map { rowData =>
+                  val executionUIData = rowData.executionUIData
+                  val submissionTime = executionUIData.submissionTime
+                  val duration = rowData.duration
+                  <tr>
+                    <td>
+                      {executionUIData.executionId.toString}
+                    </td>
+                    <td>
+                      {descriptionCell(executionUIData)}
+                    </td>
+                    <td sorttable_customkey={submissionTime.toString}>
+                      {UIUtils.formatDate(submissionTime)}
+                    </td>
+                    <td sorttable_customkey={duration.toString}>
+                      {UIUtils.formatDuration(duration)}
+                    </td>
+                    {if (showRunningJobs) {
+                      <td>
+                        {jobLinks(rowData.runningJobData)}
+                      </td>
+                    }}
+                    {if (showSucceededJobs) {
+                      <td>
+                        {jobLinks(rowData.completedJobData)}
+                      </td>
+                    }}
+                    {if (showFailedJobs) {
+                      <td>
+                        {jobLinks(rowData.failedJobData)}
+                      </td>
+                    }}
+                  </tr>
+                }
+              }
+            </tbody>
+          </table>
         </td>
-      }}
-    </tr>
+      </tr>
+    } else {
+      Nil
+    }
+    baseRow ++ subRow
   }
 
   private def descriptionCell(execution: SQLExecutionUIData): Seq[Node] = {
@@ -358,7 +482,8 @@ private[ui] class ExecutionTableRowData(
     val executionUIData: SQLExecutionUIData,
     val runningJobData: Seq[Int],
     val completedJobData: Seq[Int],
-    val failedJobData: Seq[Int])
+    val failedJobData: Seq[Int],
+    val subExecutionData: Seq[ExecutionTableRowData])
 
 
 private[ui] class ExecutionDataSource(
@@ -369,7 +494,9 @@ private[ui] class ExecutionDataSource(
     desc: Boolean,
     showRunningJobs: Boolean,
     showSucceededJobs: Boolean,
-    showFailedJobs: Boolean) extends 
PagedDataSource[ExecutionTableRowData](pageSize) {
+    showFailedJobs: Boolean,
+    subExecutions: Map[Long, Seq[SQLExecutionUIData]])
+  extends PagedDataSource[ExecutionTableRowData](pageSize) {
 
   // Convert ExecutionData to ExecutionTableRowData which contains the final 
contents to show
   // in the table so that we can avoid creating duplicate contents during 
sorting the data
@@ -401,12 +528,18 @@ private[ui] class ExecutionDataSource(
       }.map { case (jobId, _) => jobId }.toSeq.sorted
     } else Seq.empty
 
+    val executions = subExecutions.get(executionUIData.executionId) match {
+      case Some(executions) => executions.map(executionRow)
+      case _ => Seq.empty
+    }
+
     new ExecutionTableRowData(
       duration,
       executionUIData,
       runningJobData,
       completedJobData,
-      failedJobData)
+      failedJobData,
+      executions)
   }
 
   /** Return Ordering according to sortColumn and desc. */
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
index 64c652542ff..32b215b1c2e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
@@ -90,6 +90,7 @@ class SQLAppStatusListener(
           // data corresponding to the execId.
           val sqlStoreData = kvstore.read(classOf[SQLExecutionUIData], 
executionId)
           val executionData = new LiveExecutionData(executionId)
+          executionData.rootExecutionId = sqlStoreData.rootExecutionId
           executionData.description = sqlStoreData.description
           executionData.details = sqlStoreData.details
           executionData.physicalPlanDescription = 
sqlStoreData.physicalPlanDescription
@@ -340,7 +341,7 @@ class SQLAppStatusListener(
   }
 
   private def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit = {
-    val SparkListenerSQLExecutionStart(executionId, description, details,
+    val SparkListenerSQLExecutionStart(executionId, rootExecutionId, 
description, details,
       physicalPlanDescription, sparkPlanInfo, time, modifiedConfigs) = event
 
     val planGraph = SparkPlanGraph(sparkPlanInfo)
@@ -355,6 +356,7 @@ class SQLAppStatusListener(
     kvstore.write(graphToStore)
 
     val exec = getOrCreateExecution(executionId)
+    exec.rootExecutionId = rootExecutionId
     exec.description = description
     exec.details = details
     exec.physicalPlanDescription = physicalPlanDescription
@@ -483,6 +485,7 @@ class SQLAppStatusListener(
 
 private class LiveExecutionData(val executionId: Long) extends LiveEntity {
 
+  var rootExecutionId: Long = _
   var description: String = null
   var details: String = null
   var physicalPlanDescription: String = null
@@ -505,6 +508,7 @@ private class LiveExecutionData(val executionId: Long) 
extends LiveEntity {
   override protected def doUpdate(): Any = {
     new SQLExecutionUIData(
       executionId,
+      rootExecutionId,
       description,
       details,
       physicalPlanDescription,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
index 46827aaa1d9..6c92b98ca3d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
@@ -83,6 +83,7 @@ class SQLAppStatusStore(
 
 class SQLExecutionUIData(
     @KVIndexParam val executionId: Long,
+    val rootExecutionId: Long,
     val description: String,
     val details: String,
     val physicalPlanDescription: String,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
index ce665e11893..b931b4fcde1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala
@@ -43,6 +43,8 @@ case class SparkListenerSQLAdaptiveSQLMetricUpdates(
 @DeveloperApi
 case class SparkListenerSQLExecutionStart(
     executionId: Long,
+    // if the execution is a root, then rootExecutionId == executionId
+    rootExecutionId: Long,
     description: String,
     details: String,
     physicalPlanDescription: String,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
index 8a258fb1219..5aa6ddbdb7a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala
@@ -17,12 +17,15 @@
 
 package org.apache.spark.sql.execution.ui
 
+import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
 import org.apache.spark.ui.{SparkUI, SparkUITab}
 
 class SQLTab(val sqlStore: SQLAppStatusStore, sparkUI: SparkUI)
   extends SparkUITab(sparkUI, "SQL") with Logging {
 
+  def conf: SparkConf = sparkUI.conf
+
   override val name = "SQL / DataFrame"
 
   val parent = sparkUI
diff --git 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
index 09cef9663c0..1ccaf5c68c9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
@@ -80,6 +80,7 @@ class SQLExecutionUIDataSerializer extends ProtobufSerDe {
 
     new SQLExecutionUIData(
       executionId = ui.getExecutionId,
+      rootExecutionId = ui.getRootExecutionId,
       description = ui.getDescription,
       details = ui.getDetails,
       physicalPlanDescription = ui.getPhysicalPlanDescription,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala
index 9f2b08a65ea..e9d98ee9715 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala
@@ -59,7 +59,7 @@ class SQLJsonProtocolSuite extends SparkFunSuite with 
LocalSparkSession {
 
       val reconstructedEvent = 
JsonProtocol.sparkEventFromJson(SQLExecutionStartJsonString)
       if (newExecutionStartEvent) {
-        val expectedEvent = SparkListenerSQLExecutionStart(0, "test desc", 
"test detail",
+        val expectedEvent = SparkListenerSQLExecutionStart(0, 0, "test desc", 
"test detail",
           "test plan", new SparkPlanInfo("TestNode", "test string", Nil, 
Map(), Nil), 0,
           Map("k1" -> "v1"))
         assert(reconstructedEvent == expectedEvent)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala
index 090c149886a..42b27bd9f28 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLEventFilterBuilderSuite.scala
@@ -57,7 +57,7 @@ class SQLEventFilterBuilderSuite extends SparkFunSuite {
     }
 
     // Start SQL Execution
-    listener.onOtherEvent(SparkListenerSQLExecutionStart(1, "desc1", 
"details1", "plan",
+    listener.onOtherEvent(SparkListenerSQLExecutionStart(1, 1, "desc1", 
"details1", "plan",
       new SparkPlanInfo("node", "str", Seq.empty, Map.empty, Seq.empty), time, 
Map.empty))
 
     time += 1
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala
index 724df8ebe8b..f1b77e502df 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/history/SQLLiveEntitiesEventFilterSuite.scala
@@ -41,7 +41,7 @@ class SQLLiveEntitiesEventFilterSuite extends SparkFunSuite {
     val acceptFn = filter.acceptFn().lift
 
     // Verifying with finished SQL execution 1
-    assert(Some(false) === acceptFn(SparkListenerSQLExecutionStart(1, 
"description1", "details1",
+    assert(Some(false) === acceptFn(SparkListenerSQLExecutionStart(1, 1, 
"description1", "details1",
       "plan", null, 0, Map.empty)))
     assert(Some(false) === acceptFn(SparkListenerSQLExecutionEnd(1, 0)))
     assert(Some(false) === acceptFn(SparkListenerSQLAdaptiveExecutionUpdate(1, 
"plan", null)))
@@ -88,7 +88,7 @@ class SQLLiveEntitiesEventFilterSuite extends SparkFunSuite {
     }
 
     // Verifying with live SQL execution 2
-    assert(Some(true) === acceptFn(SparkListenerSQLExecutionStart(2, 
"description2", "details2",
+    assert(Some(true) === acceptFn(SparkListenerSQLExecutionStart(2, 2, 
"description2", "details2",
       "plan", null, 0, Map.empty)))
     assert(Some(true) === acceptFn(SparkListenerSQLExecutionEnd(2, 0)))
     assert(Some(true) === acceptFn(SparkListenerSQLAdaptiveExecutionUpdate(2, 
"plan", null)))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala
index a5368711260..7af58867f33 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/AllExecutionsPageSuite.scala
@@ -28,6 +28,7 @@ import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.config.Status.LIVE_UI_LOCAL_STORE_DIR
+import org.apache.spark.internal.config.UI.UI_SQL_GROUP_SUB_EXECUTION_ENABLED
 import org.apache.spark.scheduler.{JobFailed, SparkListenerJobEnd, 
SparkListenerJobStart}
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution}
@@ -60,6 +61,7 @@ abstract class AllExecutionsPageSuite extends 
SharedSparkSession with BeforeAndA
     when(tab.sqlStore).thenReturn(statusStore)
 
     val request = mock(classOf[HttpServletRequest])
+    when(tab.conf).thenReturn(new SparkConf(false))
     when(tab.appName).thenReturn("testing")
     when(tab.headerTabs).thenReturn(Seq.empty)
 
@@ -74,6 +76,7 @@ abstract class AllExecutionsPageSuite extends 
SharedSparkSession with BeforeAndA
     when(tab.sqlStore).thenReturn(statusStore)
 
     val request = mock(classOf[HttpServletRequest])
+    when(tab.conf).thenReturn(new SparkConf(false))
     when(tab.appName).thenReturn("testing")
     when(tab.headerTabs).thenReturn(Seq.empty)
 
@@ -82,6 +85,7 @@ abstract class AllExecutionsPageSuite extends 
SharedSparkSession with BeforeAndA
       val page = new AllExecutionsPage(tab)
       val df = createTestDataFrame
       listener.onOtherEvent(SparkListenerSQLExecutionStart(
+        0,
         0,
         "test",
         "test",
@@ -109,6 +113,7 @@ abstract class AllExecutionsPageSuite extends 
SharedSparkSession with BeforeAndA
     val tab = mock(classOf[SQLTab], RETURNS_SMART_NULLS)
     val request = mock(classOf[HttpServletRequest])
 
+    when(tab.conf).thenReturn(new SparkConf(false))
     when(tab.sqlStore).thenReturn(statusStore)
     when(tab.appName).thenReturn("testing")
     when(tab.headerTabs).thenReturn(Seq.empty)
@@ -121,6 +126,50 @@ abstract class AllExecutionsPageSuite extends 
SharedSparkSession with BeforeAndA
     assert(html.contains("duration"))
   }
 
+  test("group sub executions") {
+    val statusStore = createStatusStore
+    val tab = mock(classOf[SQLTab], RETURNS_SMART_NULLS)
+    val request = mock(classOf[HttpServletRequest])
+
+    val sparkConf = new 
SparkConf(false).set(UI_SQL_GROUP_SUB_EXECUTION_ENABLED, true)
+    when(tab.conf).thenReturn(sparkConf)
+    when(tab.sqlStore).thenReturn(statusStore)
+    when(tab.appName).thenReturn("testing")
+    when(tab.headerTabs).thenReturn(Seq.empty)
+
+    val listener = statusStore.listener.get
+    val page = new AllExecutionsPage(tab)
+    val df = createTestDataFrame
+    listener.onOtherEvent(SparkListenerSQLExecutionStart(
+      0,
+      0,
+      "test",
+      "test",
+      df.queryExecution.toString,
+      SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+      System.currentTimeMillis()))
+    listener.onOtherEvent(SparkListenerSQLExecutionStart(
+      1,
+      0,
+      "test",
+      "test",
+      df.queryExecution.toString,
+      SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+      System.currentTimeMillis()))
+    // sub execution has a missing root execution
+    listener.onOtherEvent(SparkListenerSQLExecutionStart(
+      2,
+      100,
+      "test",
+      "test",
+      df.queryExecution.toString,
+      SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+      System.currentTimeMillis()))
+    val html = page.render(request).toString().toLowerCase(Locale.ROOT)
+    assert(html.contains("sub execution ids") && 
html.contains("sub-execution-list"))
+    // sub execution should still be displayed if the root execution is missing
+    assert(html.contains("id=2"))
+  }
 
   protected def createStatusStore: SQLAppStatusStore
 
@@ -146,6 +195,7 @@ abstract class AllExecutionsPageSuite extends 
SharedSparkSession with BeforeAndA
     Seq(0, 1).foreach { executionId =>
       val df = createTestDataFrame
       listener.onOtherEvent(SparkListenerSQLExecutionStart(
+        executionId,
         executionId,
         "test",
         "test",
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala
index d994126fe63..3b9efb18057 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/MetricsAggregationBenchmark.scala
@@ -74,6 +74,7 @@ object MetricsAggregationBenchmark extends BenchmarkBase {
     val idgen = new AtomicInteger()
     val executionId = idgen.incrementAndGet()
     val executionStart = SparkListenerSQLExecutionStart(
+      executionId,
       executionId,
       getClass().getName(),
       getClass().getName(),
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index 3d80935fdda..26bb35cf0e5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -190,6 +190,7 @@ abstract class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTes
     }.toMap
 
     listener.onOtherEvent(SparkListenerSQLExecutionStart(
+      executionId,
       executionId,
       "test",
       "test",
@@ -343,7 +344,7 @@ abstract class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTes
       val listener = new SparkListener {
         override def onOtherEvent(event: SparkListenerEvent): Unit = {
           event match {
-            case SparkListenerSQLExecutionStart(_, _, _, planDescription, _, 
_, _) =>
+            case SparkListenerSQLExecutionStart(_, _, _, _, planDescription, 
_, _, _) =>
               assert(expected.forall(planDescription.contains))
               checkDone = true
             case _ => // ignore other events
@@ -380,6 +381,7 @@ abstract class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTes
     val executionId = 0
     val df = createTestDataFrame
     listener.onOtherEvent(SparkListenerSQLExecutionStart(
+      executionId,
       executionId,
       "test",
       "test",
@@ -410,6 +412,7 @@ abstract class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTes
     val executionId = 0
     val df = createTestDataFrame
     listener.onOtherEvent(SparkListenerSQLExecutionStart(
+      executionId,
       executionId,
       "test",
       "test",
@@ -451,6 +454,7 @@ abstract class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTes
     val executionId = 0
     val df = createTestDataFrame
     listener.onOtherEvent(SparkListenerSQLExecutionStart(
+      executionId,
       executionId,
       "test",
       "test",
@@ -481,6 +485,7 @@ abstract class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTes
     val executionId = 0
     val df = createTestDataFrame
     listener.onOtherEvent(SparkListenerSQLExecutionStart(
+      executionId,
       executionId,
       "test",
       "test",
@@ -512,6 +517,7 @@ abstract class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTes
     val executionId = 0
     val df = createTestDataFrame
     listener.onOtherEvent(SparkListenerSQLExecutionStart(
+      executionId,
       executionId,
       "test",
       "test",
@@ -652,6 +658,7 @@ abstract class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTes
     // Start execution 1 and execution 2
     time += 1
     listener.onOtherEvent(SparkListenerSQLExecutionStart(
+      1,
       1,
       "test",
       "test",
@@ -661,6 +668,7 @@ abstract class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTes
       Map.empty))
     time += 1
     listener.onOtherEvent(SparkListenerSQLExecutionStart(
+      2,
       2,
       "test",
       "test",
@@ -678,6 +686,7 @@ abstract class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTes
     // Start execution 3 and execution 2 should be evicted.
     time += 1
     listener.onOtherEvent(SparkListenerSQLExecutionStart(
+      3,
       3,
       "test",
       "test",
@@ -714,6 +723,7 @@ abstract class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTes
         .allNodes.flatMap(_.metrics.map(_.accumulatorId))
 
     listener.onOtherEvent(SparkListenerSQLExecutionStart(
+      executionId,
       executionId,
       "test",
       "test",
diff --git 
a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala
index f6fd1fc42ce..a0154d724da 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala
@@ -82,6 +82,7 @@ object SqlResourceSuite {
 
     new SQLExecutionUIData(
       executionId = 0,
+      rootExecutionId = 1,
       description = DESCRIPTION,
       details = "",
       physicalPlanDescription = PLAN_DESCRIPTION,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
index f7b783ef3ca..ddc693f1ee3 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
@@ -52,6 +52,7 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite {
 
     val input1 = new SQLExecutionUIData(
       executionId = templateData.executionId,
+      rootExecutionId = templateData.rootExecutionId,
       description = templateData.description,
       details = templateData.details,
       physicalPlanDescription = templateData.physicalPlanDescription,
@@ -71,6 +72,7 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite {
 
     val input2 = new SQLExecutionUIData(
       executionId = templateData.executionId,
+      rootExecutionId = templateData.rootExecutionId,
       description = templateData.description,
       details = templateData.details,
       physicalPlanDescription = templateData.physicalPlanDescription,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to