Repository: spark Updated Branches: refs/heads/master 78e133141 -> 3494b1228
[SPARK-25566][SPARK-25567][WEBUI][SQL] Support pagination for SQL tab to avoid OOM ## What changes were proposed in this pull request? Currently SQL tab in the WEBUI doesn't support pagination. Because of that following issues are happening. 1) For large number of executions, SQL page is throwing OOM exception (around 40,000) 2) For large number of executions, loading SQL page is taking time. 3) Difficult to analyse the execution table for large number of execution. [Note: spark.sql.ui.retainedExecutions = 50000] All the tabs, Jobs, Stages etc. supports pagination. So, to make it consistent with other tabs SQL tab also should support pagination. I have followed the similar flow of the pagination code in the Jobs and Stages page for SQL page. Also, this patch doesn't make any behavior change for the SQL tab except the pagination support. ## How was this patch tested? bin/spark-shell --conf spark.sql.ui.retainedExecutions=50000 Run 50,000 sql queries. **Before this PR**   **After this PR** Loading of the page is faster, and OOM issue doesn't happen.  Closes #22645 from shahidki31/SPARK-25566. Authored-by: Shahid <[email protected]> Signed-off-by: Sean Owen <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3494b122 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3494b122 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3494b122 Branch: refs/heads/master Commit: 3494b122814ed991b40dc4b80703c0ef55493d36 Parents: 78e1331 Author: Shahid <[email protected]> Authored: Fri Oct 12 12:36:35 2018 -0500 Committer: Sean Owen <[email protected]> Committed: Fri Oct 12 12:36:35 2018 -0500 ---------------------------------------------------------------------- .../scala/org/apache/spark/ui/PagedTable.scala | 4 +- .../sql/execution/ui/AllExecutionsPage.scala | 404 ++++++++++++++----- 2 files changed, 312 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/3494b122/core/src/main/scala/org/apache/spark/ui/PagedTable.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala index 65fa383..2fc0259 100644 --- a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala @@ -31,7 +31,7 @@ import org.apache.spark.util.Utils * * @param pageSize the number of rows in a page */ -private[ui] abstract class PagedDataSource[T](val pageSize: Int) { +private[spark] abstract class PagedDataSource[T](val pageSize: Int) { if (pageSize <= 0) { throw new IllegalArgumentException("Page size must be positive") @@ -72,7 +72,7 @@ private[ui] case class PageData[T](totalPage: Int, data: Seq[T]) /** * A paged table that will generate a HTML table for a specified page and also the page navigation. */ -private[ui] trait PagedTable[T] { +private[spark] trait PagedTable[T] { def tableId: String http://git-wip-us.apache.org/repos/asf/spark/blob/3494b122/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala index 1b2d8a8..1a25cd2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala @@ -17,16 +17,17 @@ package org.apache.spark.sql.execution.ui +import java.net.URLEncoder import javax.servlet.http.HttpServletRequest +import scala.collection.JavaConverters._ import scala.collection.mutable -import scala.xml.{Node, NodeSeq} - -import org.apache.commons.lang3.StringEscapeUtils +import scala.xml.{Node, NodeSeq, Unparsed} import org.apache.spark.JobExecutionStatus import org.apache.spark.internal.Logging -import org.apache.spark.ui.{UIUtils, WebUIPage} +import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils, WebUIPage} +import org.apache.spark.util.Utils private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with Logging { @@ -55,8 +56,8 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L val _content = mutable.ListBuffer[Node]() if (running.nonEmpty) { - val runningPageTable = new RunningExecutionTable( - parent, currentTime, running.sortBy(_.submissionTime).reverse).toNodeSeq(request) + val runningPageTable = + executionsTable(request, "running", running, currentTime, true, true, true) _content ++= <span id="running" class="collapse-aggregated-runningExecutions collapse-table" @@ -73,8 +74,8 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L } if (completed.nonEmpty) { - val completedPageTable = new CompletedExecutionTable( - parent, currentTime, completed.sortBy(_.submissionTime).reverse).toNodeSeq(request) + val completedPageTable = + executionsTable(request, "completed", completed, currentTime, false, true, false) _content ++= <span id="completed" class="collapse-aggregated-completedExecutions collapse-table" @@ -91,8 +92,8 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L } if (failed.nonEmpty) { - val failedPageTable = new FailedExecutionTable( - parent, currentTime, failed.sortBy(_.submissionTime).reverse).toNodeSeq(request) + val failedPageTable = + executionsTable(request, "failed", failed, currentTime, false, true, true) _content ++= <span id="failed" class="collapse-aggregated-failedExecutions collapse-table" @@ -121,7 +122,7 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L { if (running.nonEmpty) { <li> - <a href="#running-execution-table"><strong>Running Queries:</strong></a> + <a href="#running"><strong>Running Queries:</strong></a> {running.size} </li> } @@ -129,7 +130,7 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L { if (completed.nonEmpty) { <li> - <a href="#completed-execution-table"><strong>Completed Queries:</strong></a> + <a href="#completed"><strong>Completed Queries:</strong></a> {completed.size} </li> } @@ -137,50 +138,232 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L { if (failed.nonEmpty) { <li> - <a href="#failed-execution-table"><strong>Failed Queries:</strong></a> + <a href="#failed"><strong>Failed Queries:</strong></a> {failed.size} </li> } } </ul> </div> + UIUtils.headerSparkPage(request, "SQL", summary ++ content, parent, Some(5000)) } + + private def executionsTable( + request: HttpServletRequest, + executionTag: String, + executionData: Seq[SQLExecutionUIData], + currentTime: Long, + showRunningJobs: Boolean, + showSucceededJobs: Boolean, + showFailedJobs: Boolean): Seq[Node] = { + + // stripXSS is called to remove suspicious characters used in XSS attacks + val allParameters = request.getParameterMap.asScala.toMap.map { case (k, v) => + UIUtils.stripXSS(k) -> v.map(UIUtils.stripXSS).toSeq + } + val parameterOtherTable = allParameters.filterNot(_._1.startsWith(executionTag)) + .map(para => para._1 + "=" + para._2(0)) + + val parameterExecutionPage = UIUtils.stripXSS(request.getParameter(s"$executionTag.page")) + val parameterExecutionSortColumn = UIUtils.stripXSS(request + .getParameter(s"$executionTag.sort")) + val parameterExecutionSortDesc = UIUtils.stripXSS(request.getParameter(s"$executionTag.desc")) + val parameterExecutionPageSize = UIUtils.stripXSS(request + .getParameter(s"$executionTag.pageSize")) + val parameterExecutionPrevPageSize = UIUtils.stripXSS(request + .getParameter(s"$executionTag.prevPageSize")) + + val executionPage = Option(parameterExecutionPage).map(_.toInt).getOrElse(1) + val executionSortColumn = Option(parameterExecutionSortColumn).map { sortColumn => + UIUtils.decodeURLParameter(sortColumn) + }.getOrElse("ID") + val executionSortDesc = Option(parameterExecutionSortDesc).map(_.toBoolean).getOrElse( + // New executions should be shown above old executions by default. + executionSortColumn == "ID" + ) + val executionPageSize = Option(parameterExecutionPageSize).map(_.toInt).getOrElse(100) + val executionPrevPageSize = Option(parameterExecutionPrevPageSize).map(_.toInt) + .getOrElse(executionPageSize) + + // If the user has changed to a larger page size, then go to page 1 in order to avoid + // IndexOutOfBoundsException. + val page: Int = if (executionPageSize <= executionPrevPageSize) { + executionPage + } else { + 1 + } + val tableHeaderId = executionTag // "running", "completed" or "failed" + + try { + new ExecutionPagedTable( + request, + parent, + executionData, + tableHeaderId, + executionTag, + UIUtils.prependBaseUri(request, parent.basePath), + "SQL", // subPath + parameterOtherTable, + currentTime, + pageSize = executionPageSize, + sortColumn = executionSortColumn, + desc = executionSortDesc, + showRunningJobs, + showSucceededJobs, + showFailedJobs).table(page) + } catch { + case e@(_: IllegalArgumentException | _: IndexOutOfBoundsException) => + <div class="alert alert-error"> + <p>Error while rendering execution table:</p> + <pre> + {Utils.exceptionString(e)} + </pre> + </div> + } + } } -private[ui] abstract class ExecutionTable( +private[ui] class ExecutionPagedTable( + request: HttpServletRequest, parent: SQLTab, - tableId: String, + data: Seq[SQLExecutionUIData], + tableHeaderId: String, + executionTag: String, + basePath: String, + subPath: String, + parameterOtherTable: Iterable[String], currentTime: Long, - executionUIDatas: Seq[SQLExecutionUIData], + pageSize: Int, + sortColumn: String, + desc: Boolean, showRunningJobs: Boolean, showSucceededJobs: Boolean, - showFailedJobs: Boolean) { + showFailedJobs: Boolean) extends PagedTable[ExecutionTableRowData] { - protected def baseHeader: Seq[String] = Seq( - "ID", - "Description", - "Submitted", - "Duration") + override val dataSource = new ExecutionDataSource( + request, + parent, + data, + basePath, + currentTime, + pageSize, + sortColumn, + desc, + showRunningJobs, + showSucceededJobs, + showFailedJobs) + + private val parameterPath = s"$basePath/$subPath/?${parameterOtherTable.mkString("&")}" + + override def tableId: String = s"$executionTag-table" + + override def tableCssClass: String = + "table table-bordered table-condensed table-striped " + + "table-head-clickable table-cell-width-limited" + + override def prevPageSizeFormField: String = s"$executionTag.prevPageSize" + + override def pageLink(page: Int): String = { + val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8") + parameterPath + + s"&$pageNumberFormField=$page" + + s"&$executionTag.sort=$encodedSortColumn" + + s"&$executionTag.desc=$desc" + + s"&$pageSizeFormField=$pageSize" + + s"#$tableHeaderId" + } - protected def header: Seq[String] + override def pageSizeFormField: String = s"$executionTag.pageSize" - protected def row( - request: HttpServletRequest, - currentTime: Long, - executionUIData: SQLExecutionUIData): Seq[Node] = { - val submissionTime = executionUIData.submissionTime - val duration = executionUIData.completionTime.map(_.getTime()).getOrElse(currentTime) - - submissionTime + override def pageNumberFormField: String = s"$executionTag.page" + + override def goButtonFormPath: String = { + val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8") + s"$parameterPath&$executionTag.sort=$encodedSortColumn&$executionTag.desc=$desc#$tableHeaderId" + } + + override def headers: Seq[Node] = { + // Information for each header: title, sortable + val executionHeadersAndCssClasses: Seq[(String, Boolean)] = + Seq( + ("ID", true), + ("Description", true), + ("Submitted", true), + ("Duration", true)) ++ { + if (showRunningJobs && showSucceededJobs && showFailedJobs) { + Seq( + ("Running Job IDs", true), + ("Succeeded Job IDs", true), + ("Failed Job IDs", true)) + } else if (showSucceededJobs && showFailedJobs) { + Seq( + ("Succeeded Job IDs", true), + ("Failed Job IDs", true)) + } else { + Seq(("Job IDs", true)) + } + } - def jobLinks(status: JobExecutionStatus): Seq[Node] = { - executionUIData.jobs.flatMap { case (jobId, jobStatus) => - if (jobStatus == status) { - <a href={jobURL(request, jobId)}>[{jobId.toString}]</a> + val sortableColumnHeaders = executionHeadersAndCssClasses.filter { + case (_, sortable) => sortable + }.map { case (title, _) => title } + + require(sortableColumnHeaders.contains(sortColumn), s"Unknown column: $sortColumn") + + val headerRow: Seq[Node] = { + executionHeadersAndCssClasses.map { case (header, sortable) => + if (header == sortColumn) { + val headerLink = Unparsed( + parameterPath + + s"&$executionTag.sort=${URLEncoder.encode(header, "UTF-8")}" + + s"&$executionTag.desc=${!desc}" + + s"&$executionTag.pageSize=$pageSize" + + s"#$tableHeaderId") + val arrow = if (desc) "▾" else "▴" // UP or DOWN + + <th> + <a href={headerLink}> + {header}<span> + {Unparsed(arrow)} + </span> + </a> + </th> } else { - None + if (sortable) { + val headerLink = Unparsed( + parameterPath + + s"&$executionTag.sort=${URLEncoder.encode(header, "UTF-8")}" + + s"&$executionTag.pageSize=$pageSize" + + s"#$tableHeaderId") + + <th> + <a href={headerLink}> + {header} + </a> + </th> + } else { + <th> + {header} + </th> + } } - }.toSeq + } + } + <thead> + {headerRow} + </thead> + } + + override def row(executionTableRow: ExecutionTableRowData): Seq[Node] = { + val executionUIData = executionTableRow.executionUIData + val submissionTime = executionUIData.submissionTime + val duration = executionTableRow.duration + + def jobLinks(jobData: Seq[Int]): Seq[Node] = { + jobData.map { jobId => + <a href={jobURL(request, jobId)}>[{jobId.toString}]</a> + } } <tr> @@ -188,7 +371,7 @@ private[ui] abstract class ExecutionTable( {executionUIData.executionId.toString} </td> <td> - {descriptionCell(request, executionUIData)} + {descriptionCell(executionUIData)} </td> <td sorttable_customkey={submissionTime.toString}> {UIUtils.formatDate(submissionTime)} @@ -198,27 +381,26 @@ private[ui] abstract class ExecutionTable( </td> {if (showRunningJobs) { <td> - {jobLinks(JobExecutionStatus.RUNNING)} + {jobLinks(executionTableRow.runningJobData)} </td> }} {if (showSucceededJobs) { <td> - {jobLinks(JobExecutionStatus.SUCCEEDED)} + {jobLinks(executionTableRow.completedJobData)} </td> }} {if (showFailedJobs) { <td> - {jobLinks(JobExecutionStatus.FAILED)} + {jobLinks(executionTableRow.failedJobData)} </td> }} </tr> } - private def descriptionCell( - request: HttpServletRequest, - execution: SQLExecutionUIData): Seq[Node] = { + private def descriptionCell(execution: SQLExecutionUIData): Seq[Node] = { val details = if (execution.details != null && execution.details.nonEmpty) { - <span onclick="clickDetail(this)" class="expand-details"> + <span onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')" + class="expand-details"> +details </span> ++ <div class="stage-details collapsed"> @@ -229,73 +411,107 @@ private[ui] abstract class ExecutionTable( } val desc = if (execution.description != null && execution.description.nonEmpty) { - <a href={executionURL(request, execution.executionId)}>{execution.description}</a> + <a href={executionURL(execution.executionId)}>{execution.description}</a> } else { - <a href={executionURL(request, execution.executionId)}>{execution.executionId}</a> + <a href={executionURL(execution.executionId)}>{execution.executionId}</a> } - <div>{desc} {details}</div> - } - - def toNodeSeq(request: HttpServletRequest): Seq[Node] = { - UIUtils.listingTable[SQLExecutionUIData]( - header, row(request, currentTime, _), executionUIDatas, id = Some(tableId)) + <div>{desc}{details}</div> } private def jobURL(request: HttpServletRequest, jobId: Long): String = "%s/jobs/job/?id=%s".format(UIUtils.prependBaseUri(request, parent.basePath), jobId) - private def executionURL(request: HttpServletRequest, executionID: Long): String = + private def executionURL(executionID: Long): String = s"${UIUtils.prependBaseUri( request, parent.basePath)}/${parent.prefix}/execution/?id=$executionID" } -private[ui] class RunningExecutionTable( - parent: SQLTab, - currentTime: Long, - executionUIDatas: Seq[SQLExecutionUIData]) - extends ExecutionTable( - parent, - "running-execution-table", - currentTime, - executionUIDatas, - showRunningJobs = true, - showSucceededJobs = true, - showFailedJobs = true) { - override protected def header: Seq[String] = - baseHeader ++ Seq("Running Job IDs", "Succeeded Job IDs", "Failed Job IDs") -} +private[ui] class ExecutionTableRowData( + val submissionTime: Long, + val duration: Long, + val executionUIData: SQLExecutionUIData, + val runningJobData: Seq[Int], + val completedJobData: Seq[Int], + val failedJobData: Seq[Int]) + -private[ui] class CompletedExecutionTable( +private[ui] class ExecutionDataSource( + request: HttpServletRequest, parent: SQLTab, + executionData: Seq[SQLExecutionUIData], + basePath: String, currentTime: Long, - executionUIDatas: Seq[SQLExecutionUIData]) - extends ExecutionTable( - parent, - "completed-execution-table", - currentTime, - executionUIDatas, - showRunningJobs = false, - showSucceededJobs = true, - showFailedJobs = false) { + pageSize: Int, + sortColumn: String, + desc: Boolean, + showRunningJobs: Boolean, + showSucceededJobs: Boolean, + showFailedJobs: Boolean) extends PagedDataSource[ExecutionTableRowData](pageSize) { - override protected def header: Seq[String] = baseHeader ++ Seq("Job IDs") -} + // Convert ExecutionData to ExecutionTableRowData which contains the final contents to show + // in the table so that we can avoid creating duplicate contents during sorting the data + private val data = executionData.map(executionRow).sorted(ordering(sortColumn, desc)) -private[ui] class FailedExecutionTable( - parent: SQLTab, - currentTime: Long, - executionUIDatas: Seq[SQLExecutionUIData]) - extends ExecutionTable( - parent, - "failed-execution-table", - currentTime, - executionUIDatas, - showRunningJobs = false, - showSucceededJobs = true, - showFailedJobs = true) { + private var _sliceExecutionIds: Set[Int] = _ + + override def dataSize: Int = data.size + + override def sliceData(from: Int, to: Int): Seq[ExecutionTableRowData] = { + val r = data.slice(from, to) + _sliceExecutionIds = r.map(_.executionUIData.executionId.toInt).toSet + r + } - override protected def header: Seq[String] = - baseHeader ++ Seq("Succeeded Job IDs", "Failed Job IDs") + private def executionRow(executionUIData: SQLExecutionUIData): ExecutionTableRowData = { + val submissionTime = executionUIData.submissionTime + val duration = executionUIData.completionTime.map(_.getTime()) + .getOrElse(currentTime) - submissionTime + + val runningJobData = if (showRunningJobs) { + executionUIData.jobs.filter { + case (_, jobStatus) => jobStatus == JobExecutionStatus.RUNNING + }.map { case (jobId, _) => jobId }.toSeq.sorted + } else Seq.empty + + val completedJobData = if (showSucceededJobs) { + executionUIData.jobs.filter { + case (_, jobStatus) => jobStatus == JobExecutionStatus.SUCCEEDED + }.map { case (jobId, _) => jobId }.toSeq.sorted + } else Seq.empty + + val failedJobData = if (showFailedJobs) { + executionUIData.jobs.filter { + case (_, jobStatus) => jobStatus == JobExecutionStatus.FAILED + }.map { case (jobId, _) => jobId }.toSeq.sorted + } else Seq.empty + + new ExecutionTableRowData( + submissionTime, + duration, + executionUIData, + runningJobData, + completedJobData, + failedJobData) + } + + /** Return Ordering according to sortColumn and desc. */ + private def ordering(sortColumn: String, desc: Boolean): Ordering[ExecutionTableRowData] = { + val ordering: Ordering[ExecutionTableRowData] = sortColumn match { + case "ID" => Ordering.by(_.executionUIData.executionId) + case "Description" => Ordering.by(_.executionUIData.description) + case "Submitted" => Ordering.by(_.executionUIData.submissionTime) + case "Duration" => Ordering.by(_.duration) + case "Job IDs" | "Succeeded Job IDs" => Ordering by (_.completedJobData.headOption) + case "Running Job IDs" => Ordering.by(_.runningJobData.headOption) + case "Failed Job IDs" => Ordering.by(_.failedJobData.headOption) + case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn") + } + if (desc) { + ordering.reverse + } else { + ordering + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
