Repository: spark
Updated Branches:
refs/heads/master c979c8bba -> db36e1e75
[SPARK-15590][WEBUI] Paginate Job Table in Jobs tab
## What changes were proposed in this pull request?
This patch adds pagination support for the Job Tables in the Jobs tab.
Pagination is provided for all of the three Job Tables (active, completed, and
failed). Interactions (jumping, sorting, and setting page size) for paged
tables are also included.
The diff didn't keep track of some lines based on the original ones. The
function `makeRow`of the original `AllJobsPage.scala` is reused. They are
separated at the beginning of the function `jobRow` (L427-439) and the function
`row`(L594-618) in the new `AllJobsPage.scala`.
## How was this patch tested?
Tested manually by using checking the Web UI after completing and failing
hundreds of jobs.
Generate completed jobs by:
```scala
val d = sc.parallelize(Array(1,2,3,4,5))
for(i <- 1 to 255){ var b = d.collect() }
```
Generate failed jobs by calling the following code multiple times:
```scala
var b = d.map(_/0).collect()
```
Interactions like jumping, sorting, and setting page size are all tested.
This shows the pagination for completed jobs:

This shows the sorting works in job tables:

This shows the pagination for failed jobs and the effect of jumping and setting
page size:

Author: Tao Lin <[email protected]>
Closes #13620 from nblintao/dev.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/db36e1e7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/db36e1e7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/db36e1e7
Branch: refs/heads/master
Commit: db36e1e75d69d63b76312e85ae3a6c95cebbe65e
Parents: c979c8b
Author: Tao Lin <[email protected]>
Authored: Mon Jul 25 17:35:50 2016 -0700
Committer: Shixiong Zhu <[email protected]>
Committed: Mon Jul 25 17:35:50 2016 -0700
----------------------------------------------------------------------
.../org/apache/spark/ui/jobs/AllJobsPage.scala | 369 ++++++++++++++++---
.../org/apache/spark/ui/UISeleniumSuite.scala | 5 +-
2 files changed, 312 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/db36e1e7/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
index 035d706..e5363ce 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
@@ -17,17 +17,21 @@
package org.apache.spark.ui.jobs
+import java.net.URLEncoder
import java.util.Date
import javax.servlet.http.HttpServletRequest
+import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, ListBuffer}
import scala.xml._
import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.JobExecutionStatus
-import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
-import org.apache.spark.ui.jobs.UIData.{ExecutorUIData, JobUIData}
+import org.apache.spark.scheduler.StageInfo
+import org.apache.spark.ui._
+import org.apache.spark.ui.jobs.UIData.{ExecutorUIData, JobUIData, StageUIData}
+import org.apache.spark.util.Utils
/** Page showing list of all ongoing and recently finished jobs */
private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
@@ -210,64 +214,69 @@ private[ui] class AllJobsPage(parent: JobsTab) extends
WebUIPage("") {
</script>
}
- private def jobsTable(jobs: Seq[JobUIData]): Seq[Node] = {
+ private def jobsTable(
+ request: HttpServletRequest,
+ jobTag: String,
+ jobs: Seq[JobUIData]): Seq[Node] = {
+ val allParameters = request.getParameterMap.asScala.toMap
+ val parameterOtherTable = allParameters.filterNot(_._1.startsWith(jobTag))
+ .map(para => para._1 + "=" + para._2(0))
+
val someJobHasJobGroup = jobs.exists(_.jobGroup.isDefined)
+ val jobIdTitle = if (someJobHasJobGroup) "Job Id (Job Group)" else "Job Id"
- val columns: Seq[Node] = {
- <th>{if (someJobHasJobGroup) "Job Id (Job Group)" else "Job Id"}</th>
- <th>Description</th>
- <th>Submitted</th>
- <th>Duration</th>
- <th class="sorttable_nosort">Stages: Succeeded/Total</th>
- <th class="sorttable_nosort">Tasks (for all stages): Succeeded/Total</th>
- }
+ val parameterJobPage = request.getParameter(jobTag + ".page")
+ val parameterJobSortColumn = request.getParameter(jobTag + ".sort")
+ val parameterJobSortDesc = request.getParameter(jobTag + ".desc")
+ val parameterJobPageSize = request.getParameter(jobTag + ".pageSize")
+ val parameterJobPrevPageSize = request.getParameter(jobTag +
".prevPageSize")
- def makeRow(job: JobUIData): Seq[Node] = {
- val (lastStageName, lastStageDescription) =
getLastStageNameAndDescription(job)
- val duration: Option[Long] = {
- job.submissionTime.map { start =>
- val end = job.completionTime.getOrElse(System.currentTimeMillis())
- end - start
- }
+ val jobPage = Option(parameterJobPage).map(_.toInt).getOrElse(1)
+ val jobSortColumn = Option(parameterJobSortColumn).map { sortColumn =>
+ UIUtils.decodeURLParameter(sortColumn)
+ }.getOrElse(jobIdTitle)
+ val jobSortDesc = Option(parameterJobSortDesc).map(_.toBoolean).getOrElse(
+ // New jobs should be shown above old jobs by default.
+ if (jobSortColumn == jobIdTitle) true else false
+ )
+ val jobPageSize = Option(parameterJobPageSize).map(_.toInt).getOrElse(100)
+ val jobPrevPageSize =
Option(parameterJobPrevPageSize).map(_.toInt).getOrElse(jobPageSize)
+
+ val page: Int = {
+ // If the user has changed to a larger page size, then go to page 1 in
order to avoid
+ // IndexOutOfBoundsException.
+ if (jobPageSize <= jobPrevPageSize) {
+ jobPage
+ } else {
+ 1
}
- val formattedDuration = duration.map(d =>
UIUtils.formatDuration(d)).getOrElse("Unknown")
- val formattedSubmissionTime =
job.submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
- val basePathUri = UIUtils.prependBaseUri(parent.basePath)
- val jobDescription =
- UIUtils.makeDescription(lastStageDescription, basePathUri, plainText =
false)
-
- val detailUrl = "%s/jobs/job?id=%s".format(basePathUri, job.jobId)
- <tr id={"job-" + job.jobId}>
- <td sorttable_customkey={job.jobId.toString}>
- {job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")}
- </td>
- <td>
- {jobDescription}
- <a href={detailUrl} class="name-link">{lastStageName}</a>
- </td>
- <td sorttable_customkey={job.submissionTime.getOrElse(-1).toString}>
- {formattedSubmissionTime}
- </td>
- <td
sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td>
- <td class="stage-progress-cell">
- {job.completedStageIndices.size}/{job.stageIds.size -
job.numSkippedStages}
- {if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"}
- {if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"}
- </td>
- <td class="progress-cell">
- {UIUtils.makeProgressBar(started = job.numActiveTasks, completed =
job.numCompletedTasks,
- failed = job.numFailedTasks, skipped = job.numSkippedTasks, killed
= job.numKilledTasks,
- total = job.numTasks - job.numSkippedTasks)}
- </td>
- </tr>
}
+ val currentTime = System.currentTimeMillis()
- <table class="table table-bordered table-striped table-condensed sortable">
- <thead>{columns}</thead>
- <tbody>
- {jobs.map(makeRow)}
- </tbody>
- </table>
+ try {
+ new JobPagedTable(
+ jobs,
+ jobTag,
+ UIUtils.prependBaseUri(parent.basePath),
+ "jobs", // subPath
+ parameterOtherTable,
+ parent.jobProgresslistener.stageIdToInfo,
+ parent.jobProgresslistener.stageIdToData,
+ currentTime,
+ jobIdTitle,
+ pageSize = jobPageSize,
+ sortColumn = jobSortColumn,
+ desc = jobSortDesc
+ ).table(page)
+ } catch {
+ case e @ (_ : IllegalArgumentException | _ : IndexOutOfBoundsException)
=>
+ <div class="alert alert-error">
+ <p>Error while rendering job table:</p>
+ <pre>
+ {Utils.exceptionString(e)}
+ </pre>
+ </div>
+ }
}
def render(request: HttpServletRequest): Seq[Node] = {
@@ -279,12 +288,9 @@ private[ui] class AllJobsPage(parent: JobsTab) extends
WebUIPage("") {
val completedJobs = listener.completedJobs.reverse.toSeq
val failedJobs = listener.failedJobs.reverse.toSeq
- val activeJobsTable =
- jobsTable(activeJobs.sortBy(_.submissionTime.getOrElse(-1L)).reverse)
- val completedJobsTable =
-
jobsTable(completedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse)
- val failedJobsTable =
- jobsTable(failedJobs.sortBy(_.completionTime.getOrElse(-1L)).reverse)
+ val activeJobsTable = jobsTable(request, "activeJob", activeJobs)
+ val completedJobsTable = jobsTable(request, "completedJob",
completedJobs)
+ val failedJobsTable = jobsTable(request, "failedJob", failedJobs)
val shouldShowActiveJobs = activeJobs.nonEmpty
val shouldShowCompletedJobs = completedJobs.nonEmpty
@@ -369,3 +375,246 @@ private[ui] class AllJobsPage(parent: JobsTab) extends
WebUIPage("") {
}
}
}
+
+private[ui] class JobTableRowData(
+ val jobData: JobUIData,
+ val lastStageName: String,
+ val lastStageDescription: String,
+ val duration: Long,
+ val formattedDuration: String,
+ val submissionTime: Long,
+ val formattedSubmissionTime: String,
+ val jobDescription: NodeSeq,
+ val detailUrl: String)
+
+private[ui] class JobDataSource(
+ jobs: Seq[JobUIData],
+ stageIdToInfo: HashMap[Int, StageInfo],
+ stageIdToData: HashMap[(Int, Int), StageUIData],
+ basePath: String,
+ currentTime: Long,
+ pageSize: Int,
+ sortColumn: String,
+ desc: Boolean) extends PagedDataSource[JobTableRowData](pageSize) {
+
+ // Convert JobUIData to JobTableRowData which contains the final contents to
show in the table
+ // so that we can avoid creating duplicate contents during sorting the data
+ private val data = jobs.map(jobRow).sorted(ordering(sortColumn, desc))
+
+ private var _slicedJobIds: Set[Int] = null
+
+ override def dataSize: Int = data.size
+
+ override def sliceData(from: Int, to: Int): Seq[JobTableRowData] = {
+ val r = data.slice(from, to)
+ _slicedJobIds = r.map(_.jobData.jobId).toSet
+ r
+ }
+
+ private def getLastStageNameAndDescription(job: JobUIData): (String, String)
= {
+ val lastStageInfo = Option(job.stageIds)
+ .filter(_.nonEmpty)
+ .flatMap { ids => stageIdToInfo.get(ids.max)}
+ val lastStageData = lastStageInfo.flatMap { s =>
+ stageIdToData.get((s.stageId, s.attemptId))
+ }
+ val name = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
+ val description = lastStageData.flatMap(_.description).getOrElse("")
+ (name, description)
+ }
+
+ private def jobRow(jobData: JobUIData): JobTableRowData = {
+ val (lastStageName, lastStageDescription) =
getLastStageNameAndDescription(jobData)
+ val duration: Option[Long] = {
+ jobData.submissionTime.map { start =>
+ val end = jobData.completionTime.getOrElse(System.currentTimeMillis())
+ end - start
+ }
+ }
+ val formattedDuration = duration.map(d =>
UIUtils.formatDuration(d)).getOrElse("Unknown")
+ val submissionTime = jobData.submissionTime
+ val formattedSubmissionTime =
submissionTime.map(UIUtils.formatDate).getOrElse("Unknown")
+ val jobDescription = UIUtils.makeDescription(lastStageDescription,
basePath, plainText = false)
+
+ val detailUrl = "%s/jobs/job?id=%s".format(basePath, jobData.jobId)
+
+ new JobTableRowData (
+ jobData,
+ lastStageName,
+ lastStageDescription,
+ duration.getOrElse(-1),
+ formattedDuration,
+ submissionTime.getOrElse(-1),
+ formattedSubmissionTime,
+ jobDescription,
+ detailUrl
+ )
+ }
+
+ /**
+ * Return Ordering according to sortColumn and desc
+ */
+ private def ordering(sortColumn: String, desc: Boolean):
Ordering[JobTableRowData] = {
+ val ordering = sortColumn match {
+ case "Job Id" | "Job Id (Job Group)" => new Ordering[JobTableRowData] {
+ override def compare(x: JobTableRowData, y: JobTableRowData): Int =
+ Ordering.Int.compare(x.jobData.jobId, y.jobData.jobId)
+ }
+ case "Description" => new Ordering[JobTableRowData] {
+ override def compare(x: JobTableRowData, y: JobTableRowData): Int =
+ Ordering.String.compare(x.lastStageDescription,
y.lastStageDescription)
+ }
+ case "Submitted" => new Ordering[JobTableRowData] {
+ override def compare(x: JobTableRowData, y: JobTableRowData): Int =
+ Ordering.Long.compare(x.submissionTime, y.submissionTime)
+ }
+ case "Duration" => new Ordering[JobTableRowData] {
+ override def compare(x: JobTableRowData, y: JobTableRowData): Int =
+ Ordering.Long.compare(x.duration, y.duration)
+ }
+ case "Stages: Succeeded/Total" | "Tasks (for all stages):
Succeeded/Total" =>
+ throw new IllegalArgumentException(s"Unsortable column: $sortColumn")
+ case unknownColumn => throw new IllegalArgumentException(s"Unknown
column: $unknownColumn")
+ }
+ if (desc) {
+ ordering.reverse
+ } else {
+ ordering
+ }
+ }
+
+}
+private[ui] class JobPagedTable(
+ data: Seq[JobUIData],
+ jobTag: String,
+ basePath: String,
+ subPath: String,
+ parameterOtherTable: Iterable[String],
+ stageIdToInfo: HashMap[Int, StageInfo],
+ stageIdToData: HashMap[(Int, Int), StageUIData],
+ currentTime: Long,
+ jobIdTitle: String,
+ pageSize: Int,
+ sortColumn: String,
+ desc: Boolean
+ ) extends PagedTable[JobTableRowData] {
+ val parameterPath = UIUtils.prependBaseUri(basePath) + s"/$subPath/?" +
+ parameterOtherTable.mkString("&")
+
+ override def tableId: String = jobTag + "-table"
+
+ override def tableCssClass: String =
+ "table table-bordered table-condensed table-striped table-head-clickable"
+
+ override def pageSizeFormField: String = jobTag + ".pageSize"
+
+ override def prevPageSizeFormField: String = jobTag + ".prevPageSize"
+
+ override def pageNumberFormField: String = jobTag + ".page"
+
+ override val dataSource = new JobDataSource(
+ data,
+ stageIdToInfo,
+ stageIdToData,
+ basePath,
+ currentTime,
+ pageSize,
+ sortColumn,
+ desc)
+
+ override def pageLink(page: Int): String = {
+ val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
+ parameterPath +
+ s"&$pageNumberFormField=$page" +
+ s"&$jobTag.sort=$encodedSortColumn" +
+ s"&$jobTag.desc=$desc" +
+ s"&$pageSizeFormField=$pageSize"
+ }
+
+ override def goButtonFormPath: String = {
+ val encodedSortColumn = URLEncoder.encode(sortColumn, "UTF-8")
+ s"$parameterPath&$jobTag.sort=$encodedSortColumn&$jobTag.desc=$desc"
+ }
+
+ override def headers: Seq[Node] = {
+ // Information for each header: title, cssClass, and sortable
+ val jobHeadersAndCssClasses: Seq[(String, String, Boolean)] =
+ Seq(
+ (jobIdTitle, "", true),
+ ("Description", "", true), ("Submitted", "", true), ("Duration", "",
true),
+ ("Stages: Succeeded/Total", "", false),
+ ("Tasks (for all stages): Succeeded/Total", "", false)
+ )
+
+ if (!jobHeadersAndCssClasses.filter(_._3).map(_._1).contains(sortColumn)) {
+ throw new IllegalArgumentException(s"Unknown column: $sortColumn")
+ }
+
+ val headerRow: Seq[Node] = {
+ jobHeadersAndCssClasses.map { case (header, cssClass, sortable) =>
+ if (header == sortColumn) {
+ val headerLink = Unparsed(
+ parameterPath +
+ s"&$jobTag.sort=${URLEncoder.encode(header, "UTF-8")}" +
+ s"&$jobTag.desc=${!desc}" +
+ s"&$jobTag.pageSize=$pageSize")
+ val arrow = if (desc) "▾" else "▴" // UP or DOWN
+
+ <th class={cssClass}>
+ <a href={headerLink}>
+ {header}<span>
+ {Unparsed(arrow)}
+ </span>
+ </a>
+ </th>
+ } else {
+ if (sortable) {
+ val headerLink = Unparsed(
+ parameterPath +
+ s"&$jobTag.sort=${URLEncoder.encode(header, "UTF-8")}" +
+ s"&$jobTag.pageSize=$pageSize")
+
+ <th class={cssClass}>
+ <a href={headerLink}>
+ {header}
+ </a>
+ </th>
+ } else {
+ <th class={cssClass}>
+ {header}
+ </th>
+ }
+ }
+ }
+ }
+ <thead>{headerRow}</thead>
+ }
+
+ override def row(jobTableRow: JobTableRowData): Seq[Node] = {
+ val job = jobTableRow.jobData
+
+ <tr id={"job-" + job.jobId}>
+ <td>
+ {job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")}
+ </td>
+ <td>
+ {jobTableRow.jobDescription}
+ <a href={jobTableRow.detailUrl}
class="name-link">{jobTableRow.lastStageName}</a>
+ </td>
+ <td>
+ {jobTableRow.formattedSubmissionTime}
+ </td>
+ <td>{jobTableRow.formattedDuration}</td>
+ <td class="stage-progress-cell">
+ {job.completedStageIndices.size}/{job.stageIds.size -
job.numSkippedStages}
+ {if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"}
+ {if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"}
+ </td>
+ <td class="progress-cell">
+ {UIUtils.makeProgressBar(started = job.numActiveTasks, completed =
job.numCompletedTasks,
+ failed = job.numFailedTasks, skipped = job.numSkippedTasks, killed =
job.numKilledTasks,
+ total = job.numTasks - job.numSkippedTasks)}
+ </td>
+ </tr>
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/db36e1e7/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index b0a35fe..fd12a21 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -218,7 +218,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser
with Matchers with B
eventually(timeout(5 seconds), interval(50 milliseconds)) {
goToUi(sc, "/jobs")
val tableHeaders = findAll(cssSelector("th")).map(_.text).toSeq
- tableHeaders should not contain "Job Id (Job Group)"
+ tableHeaders(0) should not startWith "Job Id (Job Group)"
}
// Once at least one job has been run in a job group, then we should
display the group name:
sc.setJobGroup("my-job-group", "my-job-group-description")
@@ -226,7 +226,8 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser
with Matchers with B
eventually(timeout(5 seconds), interval(50 milliseconds)) {
goToUi(sc, "/jobs")
val tableHeaders = findAll(cssSelector("th")).map(_.text).toSeq
- tableHeaders should contain ("Job Id (Job Group)")
+ // Can suffix up/down arrow in the header
+ tableHeaders(0) should startWith ("Job Id (Job Group)")
}
val jobJson = getJson(sc.ui.get, "jobs")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]