http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala index dc5b03c..e4cf99e 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala @@ -22,120 +22,121 @@ import javax.servlet.http.HttpServletRequest import scala.xml.{Node, NodeSeq} import org.apache.spark.scheduler.Schedulable +import org.apache.spark.status.PoolData +import org.apache.spark.status.api.v1._ import org.apache.spark.ui.{UIUtils, WebUIPage} /** Page showing list of all ongoing and recently finished stages and pools */ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { private val sc = parent.sc - private val listener = parent.progressListener private def isFairScheduler = parent.isFairScheduler def render(request: HttpServletRequest): Seq[Node] = { - listener.synchronized { - val activeStages = listener.activeStages.values.toSeq - val pendingStages = listener.pendingStages.values.toSeq - val completedStages = listener.completedStages.reverse - val numCompletedStages = listener.numCompletedStages - val failedStages = listener.failedStages.reverse - val numFailedStages = listener.numFailedStages - val subPath = "stages" + val allStages = parent.store.stageList(null) - val activeStagesTable = - new StageTableBase(request, activeStages, "active", "activeStage", parent.basePath, subPath, - parent.progressListener, parent.isFairScheduler, - killEnabled = parent.killEnabled, isFailedStage = false) - val pendingStagesTable = - new StageTableBase(request, pendingStages, "pending", "pendingStage", parent.basePath, - subPath, parent.progressListener, parent.isFairScheduler, - killEnabled = false, isFailedStage = false) - val completedStagesTable = - new StageTableBase(request, completedStages, "completed", "completedStage", parent.basePath, - subPath, parent.progressListener, parent.isFairScheduler, - killEnabled = false, isFailedStage = false) - val failedStagesTable = - new StageTableBase(request, failedStages, "failed", "failedStage", parent.basePath, subPath, - parent.progressListener, parent.isFairScheduler, - killEnabled = false, isFailedStage = true) + val activeStages = allStages.filter(_.status == StageStatus.ACTIVE) + val pendingStages = allStages.filter(_.status == StageStatus.PENDING) + val completedStages = allStages.filter(_.status == StageStatus.COMPLETE) + val failedStages = allStages.filter(_.status == StageStatus.FAILED).reverse - // For now, pool information is only accessible in live UIs - val pools = sc.map(_.getAllPools).getOrElse(Seq.empty[Schedulable]) - val poolTable = new PoolTable(pools, parent) + val numCompletedStages = completedStages.size + val numFailedStages = failedStages.size + val subPath = "stages" - val shouldShowActiveStages = activeStages.nonEmpty - val shouldShowPendingStages = pendingStages.nonEmpty - val shouldShowCompletedStages = completedStages.nonEmpty - val shouldShowFailedStages = failedStages.nonEmpty + val activeStagesTable = + new StageTableBase(parent.store, request, activeStages, "active", "activeStage", + parent.basePath, subPath, parent.isFairScheduler, parent.killEnabled, false) + val pendingStagesTable = + new StageTableBase(parent.store, request, pendingStages, "pending", "pendingStage", + parent.basePath, subPath, parent.isFairScheduler, false, false) + val completedStagesTable = + new StageTableBase(parent.store, request, completedStages, "completed", "completedStage", + parent.basePath, subPath, parent.isFairScheduler, false, false) + val failedStagesTable = + new StageTableBase(parent.store, request, failedStages, "failed", "failedStage", + parent.basePath, subPath, parent.isFairScheduler, false, true) - val completedStageNumStr = if (numCompletedStages == completedStages.size) { - s"$numCompletedStages" - } else { - s"$numCompletedStages, only showing ${completedStages.size}" - } + // For now, pool information is only accessible in live UIs + val pools = sc.map(_.getAllPools).getOrElse(Seq.empty[Schedulable]).map { pool => + val uiPool = parent.store.asOption(parent.store.pool(pool.name)).getOrElse( + new PoolData(pool.name, Set())) + pool -> uiPool + }.toMap + val poolTable = new PoolTable(pools, parent) + + val shouldShowActiveStages = activeStages.nonEmpty + val shouldShowPendingStages = pendingStages.nonEmpty + val shouldShowCompletedStages = completedStages.nonEmpty + val shouldShowFailedStages = failedStages.nonEmpty + + val completedStageNumStr = if (numCompletedStages == completedStages.size) { + s"$numCompletedStages" + } else { + s"$numCompletedStages, only showing ${completedStages.size}" + } - val summary: NodeSeq = - <div> - <ul class="unstyled"> - { - if (shouldShowActiveStages) { - <li> - <a href="#active"><strong>Active Stages:</strong></a> - {activeStages.size} - </li> - } + val summary: NodeSeq = + <div> + <ul class="unstyled"> + { + if (shouldShowActiveStages) { + <li> + <a href="#active"><strong>Active Stages:</strong></a> + {activeStages.size} + </li> } - { - if (shouldShowPendingStages) { - <li> - <a href="#pending"><strong>Pending Stages:</strong></a> - {pendingStages.size} - </li> - } + } + { + if (shouldShowPendingStages) { + <li> + <a href="#pending"><strong>Pending Stages:</strong></a> + {pendingStages.size} + </li> } - { - if (shouldShowCompletedStages) { - <li id="completed-summary"> - <a href="#completed"><strong>Completed Stages:</strong></a> - {completedStageNumStr} - </li> - } + } + { + if (shouldShowCompletedStages) { + <li id="completed-summary"> + <a href="#completed"><strong>Completed Stages:</strong></a> + {completedStageNumStr} + </li> } - { - if (shouldShowFailedStages) { - <li> - <a href="#failed"><strong>Failed Stages:</strong></a> - {numFailedStages} - </li> - } + } + { + if (shouldShowFailedStages) { + <li> + <a href="#failed"><strong>Failed Stages:</strong></a> + {numFailedStages} + </li> } - </ul> - </div> - - var content = summary ++ - { - if (sc.isDefined && isFairScheduler) { - <h4>Fair Scheduler Pools ({pools.size})</h4> ++ poolTable.toNodeSeq - } else { - Seq.empty[Node] } + </ul> + </div> + + var content = summary ++ + { + if (sc.isDefined && isFairScheduler) { + <h4>Fair Scheduler Pools ({pools.size})</h4> ++ poolTable.toNodeSeq + } else { + Seq.empty[Node] } - if (shouldShowActiveStages) { - content ++= <h4 id="active">Active Stages ({activeStages.size})</h4> ++ - activeStagesTable.toNodeSeq - } - if (shouldShowPendingStages) { - content ++= <h4 id="pending">Pending Stages ({pendingStages.size})</h4> ++ - pendingStagesTable.toNodeSeq - } - if (shouldShowCompletedStages) { - content ++= <h4 id="completed">Completed Stages ({completedStageNumStr})</h4> ++ - completedStagesTable.toNodeSeq - } - if (shouldShowFailedStages) { - content ++= <h4 id ="failed">Failed Stages ({numFailedStages})</h4> ++ - failedStagesTable.toNodeSeq } - UIUtils.headerSparkPage("Stages for All Jobs", content, parent) + if (shouldShowActiveStages) { + content ++= <h4 id="active">Active Stages ({activeStages.size})</h4> ++ + activeStagesTable.toNodeSeq } + if (shouldShowPendingStages) { + content ++= <h4 id="pending">Pending Stages ({pendingStages.size})</h4> ++ + pendingStagesTable.toNodeSeq + } + if (shouldShowCompletedStages) { + content ++= <h4 id="completed">Completed Stages ({completedStageNumStr})</h4> ++ + completedStagesTable.toNodeSeq + } + if (shouldShowFailedStages) { + content ++= <h4 id ="failed">Failed Stages ({numFailedStages})</h4> ++ + failedStagesTable.toNodeSeq + } + UIUtils.headerSparkPage("Stages for All Jobs", content, parent) } } -
http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 07a41d1..41d42b5 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -17,44 +17,19 @@ package org.apache.spark.ui.jobs -import scala.collection.mutable import scala.xml.{Node, Unparsed} import org.apache.spark.status.AppStatusStore +import org.apache.spark.status.api.v1.StageData import org.apache.spark.ui.{ToolTips, UIUtils} -import org.apache.spark.ui.jobs.UIData.StageUIData import org.apache.spark.util.Utils /** Stage summary grouped by executors. */ -private[ui] class ExecutorTable( - stageId: Int, - stageAttemptId: Int, - parent: StagesTab, - store: AppStatusStore) { - private val listener = parent.progressListener +private[ui] class ExecutorTable(stage: StageData, store: AppStatusStore) { - def toNodeSeq: Seq[Node] = { - listener.synchronized { - executorTable() - } - } - - /** Special table which merges two header cells. */ - private def executorTable[T](): Seq[Node] = { - val stageData = listener.stageIdToData.get((stageId, stageAttemptId)) - var hasInput = false - var hasOutput = false - var hasShuffleWrite = false - var hasShuffleRead = false - var hasBytesSpilled = false - stageData.foreach { data => - hasInput = data.hasInput - hasOutput = data.hasOutput - hasShuffleRead = data.hasShuffleRead - hasShuffleWrite = data.hasShuffleWrite - hasBytesSpilled = data.hasBytesSpilled - } + import ApiHelper._ + def toNodeSeq: Seq[Node] = { <table class={UIUtils.TABLE_CLASS_STRIPED_SORTABLE}> <thead> <th id="executorid">Executor ID</th> @@ -64,29 +39,29 @@ private[ui] class ExecutorTable( <th>Failed Tasks</th> <th>Killed Tasks</th> <th>Succeeded Tasks</th> - {if (hasInput) { + {if (hasInput(stage)) { <th> <span data-toggle="tooltip" title={ToolTips.INPUT}>Input Size / Records</span> </th> }} - {if (hasOutput) { + {if (hasOutput(stage)) { <th> <span data-toggle="tooltip" title={ToolTips.OUTPUT}>Output Size / Records</span> </th> }} - {if (hasShuffleRead) { + {if (hasShuffleRead(stage)) { <th> <span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}> Shuffle Read Size / Records</span> </th> }} - {if (hasShuffleWrite) { + {if (hasShuffleWrite(stage)) { <th> <span data-toggle="tooltip" title={ToolTips.SHUFFLE_WRITE}> Shuffle Write Size / Records</span> </th> }} - {if (hasBytesSpilled) { + {if (hasBytesSpilled(stage)) { <th>Shuffle Spill (Memory)</th> <th>Shuffle Spill (Disk)</th> }} @@ -97,7 +72,7 @@ private[ui] class ExecutorTable( </th> </thead> <tbody> - {createExecutorTable()} + {createExecutorTable(stage)} </tbody> </table> <script> @@ -111,68 +86,57 @@ private[ui] class ExecutorTable( </script> } - private def createExecutorTable() : Seq[Node] = { - // Make an executor-id -> address map - val executorIdToAddress = mutable.HashMap[String, String]() - listener.blockManagerIds.foreach { blockManagerId => - val address = blockManagerId.hostPort - val executorId = blockManagerId.executorId - executorIdToAddress.put(executorId, address) - } - - listener.stageIdToData.get((stageId, stageAttemptId)) match { - case Some(stageData: StageUIData) => - stageData.executorSummary.toSeq.sortBy(_._1).map { case (k, v) => - <tr> - <td> - <div style="float: left">{k}</div> - <div style="float: right"> - { - store.executorSummary(k).map(_.executorLogs).getOrElse(Map.empty).map { - case (logName, logUrl) => <div><a href={logUrl}>{logName}</a></div> - } - } - </div> - </td> - <td>{executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")}</td> - <td sorttable_customkey={v.taskTime.toString}>{UIUtils.formatDuration(v.taskTime)}</td> - <td>{v.failedTasks + v.succeededTasks + v.reasonToNumKilled.values.sum}</td> - <td>{v.failedTasks}</td> - <td>{v.reasonToNumKilled.values.sum}</td> - <td>{v.succeededTasks}</td> - {if (stageData.hasInput) { - <td sorttable_customkey={v.inputBytes.toString}> - {s"${Utils.bytesToString(v.inputBytes)} / ${v.inputRecords}"} - </td> - }} - {if (stageData.hasOutput) { - <td sorttable_customkey={v.outputBytes.toString}> - {s"${Utils.bytesToString(v.outputBytes)} / ${v.outputRecords}"} - </td> - }} - {if (stageData.hasShuffleRead) { - <td sorttable_customkey={v.shuffleRead.toString}> - {s"${Utils.bytesToString(v.shuffleRead)} / ${v.shuffleReadRecords}"} - </td> - }} - {if (stageData.hasShuffleWrite) { - <td sorttable_customkey={v.shuffleWrite.toString}> - {s"${Utils.bytesToString(v.shuffleWrite)} / ${v.shuffleWriteRecords}"} - </td> - }} - {if (stageData.hasBytesSpilled) { - <td sorttable_customkey={v.memoryBytesSpilled.toString}> - {Utils.bytesToString(v.memoryBytesSpilled)} - </td> - <td sorttable_customkey={v.diskBytesSpilled.toString}> - {Utils.bytesToString(v.diskBytesSpilled)} - </td> - }} - <td>{v.isBlacklisted}</td> - </tr> - } - case None => - Seq.empty[Node] + private def createExecutorTable(stage: StageData) : Seq[Node] = { + stage.executorSummary.getOrElse(Map.empty).toSeq.sortBy(_._1).map { case (k, v) => + val executor = store.asOption(store.executorSummary(k)) + <tr> + <td> + <div style="float: left">{k}</div> + <div style="float: right"> + { + executor.map(_.executorLogs).getOrElse(Map.empty).map { + case (logName, logUrl) => <div><a href={logUrl}>{logName}</a></div> + } + } + </div> + </td> + <td>{executor.map { e => e.hostPort }.getOrElse("CANNOT FIND ADDRESS")}</td> + <td sorttable_customkey={v.taskTime.toString}>{UIUtils.formatDuration(v.taskTime)}</td> + <td>{v.failedTasks + v.succeededTasks + v.killedTasks}</td> + <td>{v.failedTasks}</td> + <td>{v.killedTasks}</td> + <td>{v.succeededTasks}</td> + {if (hasInput(stage)) { + <td sorttable_customkey={v.inputBytes.toString}> + {s"${Utils.bytesToString(v.inputBytes)} / ${v.inputRecords}"} + </td> + }} + {if (hasOutput(stage)) { + <td sorttable_customkey={v.outputBytes.toString}> + {s"${Utils.bytesToString(v.outputBytes)} / ${v.outputRecords}"} + </td> + }} + {if (hasShuffleRead(stage)) { + <td sorttable_customkey={v.shuffleRead.toString}> + {s"${Utils.bytesToString(v.shuffleRead)} / ${v.shuffleReadRecords}"} + </td> + }} + {if (hasShuffleWrite(stage)) { + <td sorttable_customkey={v.shuffleWrite.toString}> + {s"${Utils.bytesToString(v.shuffleWrite)} / ${v.shuffleWriteRecords}"} + </td> + }} + {if (hasBytesSpilled(stage)) { + <td sorttable_customkey={v.memoryBytesSpilled.toString}> + {Utils.bytesToString(v.memoryBytesSpilled)} + </td> + <td sorttable_customkey={v.diskBytesSpilled.toString}> + {Utils.bytesToString(v.diskBytesSpilled)} + </td> + }} + <td>{executor.map(_.isBlacklisted).getOrElse(false)}</td> + </tr> } } + } http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala index 7ed0164..740f12e7 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala @@ -17,7 +17,7 @@ package org.apache.spark.ui.jobs -import java.util.{Date, Locale} +import java.util.Locale import javax.servlet.http.HttpServletRequest import scala.collection.mutable.{Buffer, ListBuffer} @@ -27,11 +27,12 @@ import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.JobExecutionStatus import org.apache.spark.scheduler._ -import org.apache.spark.status.api.v1.ExecutorSummary -import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} +import org.apache.spark.status.AppStatusStore +import org.apache.spark.status.api.v1 +import org.apache.spark.ui._ /** Page showing statistics and stage list for a given job */ -private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { +private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIPage("job") { private val STAGES_LEGEND = <div class="legend-area"><svg width="150px" height="85px"> @@ -56,14 +57,15 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { <text x="35px" y="42px">Removed</text> </svg></div>.toString.filter(_ != '\n') - private def makeStageEvent(stageInfos: Seq[StageInfo]): Seq[String] = { + private def makeStageEvent(stageInfos: Seq[v1.StageData]): Seq[String] = { stageInfos.map { stage => val stageId = stage.stageId val attemptId = stage.attemptId val name = stage.name - val status = stage.getStatusString - val submissionTime = stage.submissionTime.get - val completionTime = stage.completionTime.getOrElse(System.currentTimeMillis()) + val status = stage.status.toString + val submissionTime = stage.submissionTime.get.getTime() + val completionTime = stage.completionTime.map(_.getTime()) + .getOrElse(System.currentTimeMillis()) // The timeline library treats contents as HTML, so we have to escape them. We need to add // extra layers of escaping in order to embed this in a Javascript string literal. @@ -79,10 +81,10 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { | 'data-placement="top" data-html="true"' + | 'data-title="${jsEscapedName} (Stage ${stageId}.${attemptId})<br>' + | 'Status: ${status.toUpperCase(Locale.ROOT)}<br>' + - | 'Submitted: ${UIUtils.formatDate(new Date(submissionTime))}' + + | 'Submitted: ${UIUtils.formatDate(submissionTime)}' + | '${ if (status != "running") { - s"""<br>Completed: ${UIUtils.formatDate(new Date(completionTime))}""" + s"""<br>Completed: ${UIUtils.formatDate(completionTime)}""" } else { "" } @@ -93,7 +95,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { } } - def makeExecutorEvent(executors: Seq[ExecutorSummary]): Seq[String] = { + def makeExecutorEvent(executors: Seq[v1.ExecutorSummary]): Seq[String] = { val events = ListBuffer[String]() executors.foreach { e => val addedEvent = @@ -137,8 +139,8 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { } private def makeTimeline( - stages: Seq[StageInfo], - executors: Seq[ExecutorSummary], + stages: Seq[v1.StageData], + executors: Seq[v1.ExecutorSummary], appStartTime: Long): Seq[Node] = { val stageEventJsonAsStrSeq = makeStageEvent(stages) @@ -182,173 +184,181 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") { } def render(request: HttpServletRequest): Seq[Node] = { - val listener = parent.jobProgresslistener + // stripXSS is called first to remove suspicious characters used in XSS attacks + val parameterId = UIUtils.stripXSS(request.getParameter("id")) + require(parameterId != null && parameterId.nonEmpty, "Missing id parameter") - listener.synchronized { - // stripXSS is called first to remove suspicious characters used in XSS attacks - val parameterId = UIUtils.stripXSS(request.getParameter("id")) - require(parameterId != null && parameterId.nonEmpty, "Missing id parameter") - - val jobId = parameterId.toInt - val jobDataOption = listener.jobIdToData.get(jobId) - if (jobDataOption.isEmpty) { - val content = - <div id="no-info"> - <p>No information to display for job {jobId}</p> - </div> - return UIUtils.headerSparkPage( - s"Details for Job $jobId", content, parent) - } - val jobData = jobDataOption.get - val isComplete = jobData.status != JobExecutionStatus.RUNNING - val stages = jobData.stageIds.map { stageId => - // This could be empty if the JobProgressListener hasn't received information about the - // stage or if the stage information has been garbage collected - listener.stageIdToInfo.getOrElse(stageId, - new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown")) + val jobId = parameterId.toInt + val jobData = store.asOption(store.job(jobId)).getOrElse { + val content = + <div id="no-info"> + <p>No information to display for job {jobId}</p> + </div> + return UIUtils.headerSparkPage( + s"Details for Job $jobId", content, parent) + } + val isComplete = jobData.status != JobExecutionStatus.RUNNING + val stages = jobData.stageIds.map { stageId => + // This could be empty if the listener hasn't received information about the + // stage or if the stage information has been garbage collected + store.stageData(stageId).lastOption.getOrElse { + new v1.StageData( + v1.StageStatus.PENDING, + stageId, + 0, 0, 0, 0, 0, 0, 0, + 0L, 0L, None, None, None, None, + 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, + "Unknown", + None, + "Unknown", + null, + Nil, + Nil, + None, + None, + Map()) } + } - val activeStages = Buffer[StageInfo]() - val completedStages = Buffer[StageInfo]() - // If the job is completed, then any pending stages are displayed as "skipped": - val pendingOrSkippedStages = Buffer[StageInfo]() - val failedStages = Buffer[StageInfo]() - for (stage <- stages) { - if (stage.submissionTime.isEmpty) { - pendingOrSkippedStages += stage - } else if (stage.completionTime.isDefined) { - if (stage.failureReason.isDefined) { - failedStages += stage - } else { - completedStages += stage - } + val activeStages = Buffer[v1.StageData]() + val completedStages = Buffer[v1.StageData]() + // If the job is completed, then any pending stages are displayed as "skipped": + val pendingOrSkippedStages = Buffer[v1.StageData]() + val failedStages = Buffer[v1.StageData]() + for (stage <- stages) { + if (stage.submissionTime.isEmpty) { + pendingOrSkippedStages += stage + } else if (stage.completionTime.isDefined) { + if (stage.status == v1.StageStatus.FAILED) { + failedStages += stage } else { - activeStages += stage + completedStages += stage } + } else { + activeStages += stage } + } - val basePath = "jobs/job" + val basePath = "jobs/job" - val pendingOrSkippedTableId = - if (isComplete) { - "pending" - } else { - "skipped" - } + val pendingOrSkippedTableId = + if (isComplete) { + "pending" + } else { + "skipped" + } - val activeStagesTable = - new StageTableBase(request, activeStages, "active", "activeStage", parent.basePath, - basePath, parent.jobProgresslistener, parent.isFairScheduler, - killEnabled = parent.killEnabled, isFailedStage = false) - val pendingOrSkippedStagesTable = - new StageTableBase(request, pendingOrSkippedStages, pendingOrSkippedTableId, "pendingStage", - parent.basePath, basePath, parent.jobProgresslistener, parent.isFairScheduler, - killEnabled = false, isFailedStage = false) - val completedStagesTable = - new StageTableBase(request, completedStages, "completed", "completedStage", parent.basePath, - basePath, parent.jobProgresslistener, parent.isFairScheduler, - killEnabled = false, isFailedStage = false) - val failedStagesTable = - new StageTableBase(request, failedStages, "failed", "failedStage", parent.basePath, - basePath, parent.jobProgresslistener, parent.isFairScheduler, - killEnabled = false, isFailedStage = true) + val activeStagesTable = + new StageTableBase(store, request, activeStages, "active", "activeStage", parent.basePath, + basePath, parent.isFairScheduler, + killEnabled = parent.killEnabled, isFailedStage = false) + val pendingOrSkippedStagesTable = + new StageTableBase(store, request, pendingOrSkippedStages, pendingOrSkippedTableId, + "pendingStage", parent.basePath, basePath, parent.isFairScheduler, + killEnabled = false, isFailedStage = false) + val completedStagesTable = + new StageTableBase(store, request, completedStages, "completed", "completedStage", + parent.basePath, basePath, parent.isFairScheduler, + killEnabled = false, isFailedStage = false) + val failedStagesTable = + new StageTableBase(store, request, failedStages, "failed", "failedStage", parent.basePath, + basePath, parent.isFairScheduler, + killEnabled = false, isFailedStage = true) - val shouldShowActiveStages = activeStages.nonEmpty - val shouldShowPendingStages = !isComplete && pendingOrSkippedStages.nonEmpty - val shouldShowCompletedStages = completedStages.nonEmpty - val shouldShowSkippedStages = isComplete && pendingOrSkippedStages.nonEmpty - val shouldShowFailedStages = failedStages.nonEmpty + val shouldShowActiveStages = activeStages.nonEmpty + val shouldShowPendingStages = !isComplete && pendingOrSkippedStages.nonEmpty + val shouldShowCompletedStages = completedStages.nonEmpty + val shouldShowSkippedStages = isComplete && pendingOrSkippedStages.nonEmpty + val shouldShowFailedStages = failedStages.nonEmpty - val summary: NodeSeq = - <div> - <ul class="unstyled"> - <li> - <Strong>Status:</Strong> - {jobData.status} - </li> - { - if (jobData.jobGroup.isDefined) { - <li> - <strong>Job Group:</strong> - {jobData.jobGroup.get} - </li> - } - } - { - if (shouldShowActiveStages) { - <li> - <a href="#active"><strong>Active Stages:</strong></a> - {activeStages.size} - </li> - } - } - { - if (shouldShowPendingStages) { - <li> - <a href="#pending"> - <strong>Pending Stages:</strong> - </a>{pendingOrSkippedStages.size} - </li> - } + val summary: NodeSeq = + <div> + <ul class="unstyled"> + <li> + <Strong>Status:</Strong> + {jobData.status} + </li> + { + if (jobData.jobGroup.isDefined) { + <li> + <strong>Job Group:</strong> + {jobData.jobGroup.get} + </li> } - { - if (shouldShowCompletedStages) { - <li> - <a href="#completed"><strong>Completed Stages:</strong></a> - {completedStages.size} - </li> - } + } + { + if (shouldShowActiveStages) { + <li> + <a href="#active"><strong>Active Stages:</strong></a> + {activeStages.size} + </li> } - { - if (shouldShowSkippedStages) { + } + { + if (shouldShowPendingStages) { <li> - <a href="#skipped"><strong>Skipped Stages:</strong></a> - {pendingOrSkippedStages.size} + <a href="#pending"> + <strong>Pending Stages:</strong> + </a>{pendingOrSkippedStages.size} </li> } + } + { + if (shouldShowCompletedStages) { + <li> + <a href="#completed"><strong>Completed Stages:</strong></a> + {completedStages.size} + </li> } - { - if (shouldShowFailedStages) { - <li> - <a href="#failed"><strong>Failed Stages:</strong></a> - {failedStages.size} - </li> - } + } + { + if (shouldShowSkippedStages) { + <li> + <a href="#skipped"><strong>Skipped Stages:</strong></a> + {pendingOrSkippedStages.size} + </li> + } + } + { + if (shouldShowFailedStages) { + <li> + <a href="#failed"><strong>Failed Stages:</strong></a> + {failedStages.size} + </li> } - </ul> - </div> + } + </ul> + </div> - var content = summary - val appStartTime = listener.startTime - val operationGraphListener = parent.operationGraphListener + var content = summary + val appStartTime = store.applicationInfo().attempts.head.startTime.getTime() - content ++= makeTimeline(activeStages ++ completedStages ++ failedStages, - parent.parent.store.executorList(false), appStartTime) + content ++= makeTimeline(activeStages ++ completedStages ++ failedStages, + store.executorList(false), appStartTime) - content ++= UIUtils.showDagVizForJob( - jobId, operationGraphListener.getOperationGraphForJob(jobId)) + content ++= UIUtils.showDagVizForJob( + jobId, store.operationGraphForJob(jobId)) - if (shouldShowActiveStages) { - content ++= <h4 id="active">Active Stages ({activeStages.size})</h4> ++ - activeStagesTable.toNodeSeq - } - if (shouldShowPendingStages) { - content ++= <h4 id="pending">Pending Stages ({pendingOrSkippedStages.size})</h4> ++ - pendingOrSkippedStagesTable.toNodeSeq - } - if (shouldShowCompletedStages) { - content ++= <h4 id="completed">Completed Stages ({completedStages.size})</h4> ++ - completedStagesTable.toNodeSeq - } - if (shouldShowSkippedStages) { - content ++= <h4 id="skipped">Skipped Stages ({pendingOrSkippedStages.size})</h4> ++ - pendingOrSkippedStagesTable.toNodeSeq - } - if (shouldShowFailedStages) { - content ++= <h4 id ="failed">Failed Stages ({failedStages.size})</h4> ++ - failedStagesTable.toNodeSeq - } - UIUtils.headerSparkPage(s"Details for Job $jobId", content, parent, showVisualization = true) + if (shouldShowActiveStages) { + content ++= <h4 id="active">Active Stages ({activeStages.size})</h4> ++ + activeStagesTable.toNodeSeq + } + if (shouldShowPendingStages) { + content ++= <h4 id="pending">Pending Stages ({pendingOrSkippedStages.size})</h4> ++ + pendingOrSkippedStagesTable.toNodeSeq + } + if (shouldShowCompletedStages) { + content ++= <h4 id="completed">Completed Stages ({completedStages.size})</h4> ++ + completedStagesTable.toNodeSeq + } + if (shouldShowSkippedStages) { + content ++= <h4 id="skipped">Skipped Stages ({pendingOrSkippedStages.size})</h4> ++ + pendingOrSkippedStagesTable.toNodeSeq + } + if (shouldShowFailedStages) { + content ++= <h4 id ="failed">Failed Stages ({failedStages.size})</h4> ++ + failedStagesTable.toNodeSeq } + UIUtils.headerSparkPage(s"Details for Job $jobId", content, parent, showVisualization = true) } } http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala index 81ffe04..99eab1b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala @@ -19,35 +19,45 @@ package org.apache.spark.ui.jobs import javax.servlet.http.HttpServletRequest +import scala.collection.JavaConverters._ + +import org.apache.spark.JobExecutionStatus import org.apache.spark.scheduler.SchedulingMode -import org.apache.spark.ui.{SparkUI, SparkUITab, UIUtils} +import org.apache.spark.status.AppStatusStore +import org.apache.spark.ui._ /** Web UI showing progress status of all jobs in the given SparkContext. */ -private[ui] class JobsTab(val parent: SparkUI) extends SparkUITab(parent, "jobs") { +private[ui] class JobsTab(parent: SparkUI, store: AppStatusStore) + extends SparkUITab(parent, "jobs") { + val sc = parent.sc val killEnabled = parent.killEnabled - val jobProgresslistener = parent.jobProgressListener - val operationGraphListener = parent.operationGraphListener - def isFairScheduler: Boolean = - jobProgresslistener.schedulingMode == Some(SchedulingMode.FAIR) + def isFairScheduler: Boolean = { + store.environmentInfo().sparkProperties.toMap + .get("spark.scheduler.mode") + .map { mode => mode == SchedulingMode.FAIR } + .getOrElse(false) + } def getSparkUser: String = parent.getSparkUser - attachPage(new AllJobsPage(this)) - attachPage(new JobPage(this)) + attachPage(new AllJobsPage(this, store)) + attachPage(new JobPage(this, store)) def handleKillRequest(request: HttpServletRequest): Unit = { if (killEnabled && parent.securityManager.checkModifyPermissions(request.getRemoteUser)) { // stripXSS is called first to remove suspicious characters used in XSS attacks val jobId = Option(UIUtils.stripXSS(request.getParameter("id"))).map(_.toInt) jobId.foreach { id => - if (jobProgresslistener.activeJobs.contains(id)) { - sc.foreach(_.cancelJob(id)) - // Do a quick pause here to give Spark time to kill the job so it shows up as - // killed after the refresh. Note that this will block the serving thread so the - // time should be limited in duration. - Thread.sleep(100) + store.asOption(store.job(id)).foreach { job => + if (job.status == JobExecutionStatus.RUNNING) { + sc.foreach(_.cancelJob(id)) + // Do a quick pause here to give Spark time to kill the job so it shows up as + // killed after the refresh. Note that this will block the serving thread so the + // time should be limited in duration. + Thread.sleep(100) + } } } } http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index 4b8c7b2..98fbd7a 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -21,46 +21,39 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.scheduler.StageInfo +import org.apache.spark.status.PoolData +import org.apache.spark.status.api.v1._ import org.apache.spark.ui.{UIUtils, WebUIPage} /** Page showing specific pool details */ private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") { - private val sc = parent.sc - private val listener = parent.progressListener def render(request: HttpServletRequest): Seq[Node] = { - listener.synchronized { - // stripXSS is called first to remove suspicious characters used in XSS attacks - val poolName = Option(UIUtils.stripXSS(request.getParameter("poolname"))).map { poolname => - UIUtils.decodeURLParameter(poolname) - }.getOrElse { - throw new IllegalArgumentException(s"Missing poolname parameter") - } - - val poolToActiveStages = listener.poolToActiveStages - val activeStages = poolToActiveStages.get(poolName) match { - case Some(s) => s.values.toSeq - case None => Seq.empty[StageInfo] - } - val shouldShowActiveStages = activeStages.nonEmpty - val activeStagesTable = - new StageTableBase(request, activeStages, "", "activeStage", parent.basePath, "stages/pool", - parent.progressListener, parent.isFairScheduler, parent.killEnabled, - isFailedStage = false) - - // For now, pool information is only accessible in live UIs - val pools = sc.map(_.getPoolForName(poolName).getOrElse { - throw new IllegalArgumentException(s"Unknown poolname: $poolName") - }).toSeq - val poolTable = new PoolTable(pools, parent) + // stripXSS is called first to remove suspicious characters used in XSS attacks + val poolName = Option(UIUtils.stripXSS(request.getParameter("poolname"))).map { poolname => + UIUtils.decodeURLParameter(poolname) + }.getOrElse { + throw new IllegalArgumentException(s"Missing poolname parameter") + } - var content = <h4>Summary </h4> ++ poolTable.toNodeSeq - if (shouldShowActiveStages) { - content ++= <h4>Active Stages ({activeStages.size})</h4> ++ activeStagesTable.toNodeSeq - } + // For now, pool information is only accessible in live UIs + val pool = parent.sc.flatMap(_.getPoolForName(poolName)).getOrElse { + throw new IllegalArgumentException(s"Unknown pool: $poolName") + } - UIUtils.headerSparkPage("Fair Scheduler Pool: " + poolName, content, parent) + val uiPool = parent.store.asOption(parent.store.pool(poolName)).getOrElse( + new PoolData(poolName, Set())) + val activeStages = uiPool.stageIds.toSeq.map(parent.store.lastStageAttempt(_)) + val activeStagesTable = + new StageTableBase(parent.store, request, activeStages, "", "activeStage", parent.basePath, + "stages/pool", parent.isFairScheduler, parent.killEnabled, false) + + val poolTable = new PoolTable(Map(pool -> uiPool), parent) + var content = <h4>Summary </h4> ++ poolTable.toNodeSeq + if (activeStages.nonEmpty) { + content ++= <h4>Active Stages ({activeStages.size})</h4> ++ activeStagesTable.toNodeSeq } + + UIUtils.headerSparkPage("Fair Scheduler Pool: " + poolName, content, parent) } } http://git-wip-us.apache.org/repos/asf/spark/blob/4741c078/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala index ea02968..5dfce85 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala @@ -19,25 +19,16 @@ package org.apache.spark.ui.jobs import java.net.URLEncoder -import scala.collection.mutable.HashMap import scala.xml.Node -import org.apache.spark.scheduler.{Schedulable, StageInfo} +import org.apache.spark.scheduler.Schedulable +import org.apache.spark.status.PoolData import org.apache.spark.ui.UIUtils /** Table showing list of pools */ -private[ui] class PoolTable(pools: Seq[Schedulable], parent: StagesTab) { - private val listener = parent.progressListener +private[ui] class PoolTable(pools: Map[Schedulable, PoolData], parent: StagesTab) { def toNodeSeq: Seq[Node] = { - listener.synchronized { - poolTable(poolRow, pools) - } - } - - private def poolTable( - makeRow: (Schedulable, HashMap[String, HashMap[Int, StageInfo]]) => Seq[Node], - rows: Seq[Schedulable]): Seq[Node] = { <table class="table table-bordered table-striped table-condensed sortable table-fixed"> <thead> <th>Pool Name</th> @@ -48,29 +39,24 @@ private[ui] class PoolTable(pools: Seq[Schedulable], parent: StagesTab) { <th>SchedulingMode</th> </thead> <tbody> - {rows.map(r => makeRow(r, listener.poolToActiveStages))} + {pools.map { case (s, p) => poolRow(s, p) }} </tbody> </table> } - private def poolRow( - p: Schedulable, - poolToActiveStages: HashMap[String, HashMap[Int, StageInfo]]): Seq[Node] = { - val activeStages = poolToActiveStages.get(p.name) match { - case Some(stages) => stages.size - case None => 0 - } + private def poolRow(s: Schedulable, p: PoolData): Seq[Node] = { + val activeStages = p.stageIds.size val href = "%s/stages/pool?poolname=%s" .format(UIUtils.prependBaseUri(parent.basePath), URLEncoder.encode(p.name, "UTF-8")) <tr> <td> <a href={href}>{p.name}</a> </td> - <td>{p.minShare}</td> - <td>{p.weight}</td> + <td>{s.minShare}</td> + <td>{s.weight}</td> <td>{activeStages}</td> - <td>{p.runningTasks}</td> - <td>{p.schedulingMode}</td> + <td>{s.runningTasks}</td> + <td>{s.schedulingMode}</td> </tr> } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org