Repository: spark Updated Branches: refs/heads/master 1114207cc -> 339441f54
http://git-wip-us.apache.org/repos/asf/spark/blob/339441f5/core/src/main/resources/org/apache/spark/ui/static/webui.css ---------------------------------------------------------------------- diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css index 11fd956..7448af8 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/webui.css +++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css @@ -112,3 +112,8 @@ pre { padding-bottom: 0; border: none; } + +.tooltip { + font-weight: normal; +} + http://git-wip-us.apache.org/repos/asf/spark/blob/339441f5/core/src/main/scala/org/apache/spark/ui/ToolTips.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala new file mode 100644 index 0000000..37708d7 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui + +private[spark] object ToolTips { + val SCHEDULER_DELAY = + """Scheduler delay includes time to ship the task from the scheduler to + the executor, and time the time to send a message from the executor to the scheduler stating + that the task has completed. When the scheduler becomes overloaded, task completion messages + become queued up, and scheduler delay increases.""" + + val INPUT = "Bytes read from Hadoop or from Spark storage." + + val SHUFFLE_WRITE = "Bytes written to disk in order to be read by a shuffle in a future stage." + + val SHUFFLE_READ = + """Bytes read from remote executors. Typically less than shuffle write bytes + because this does not include shuffle data read locally.""" +} http://git-wip-us.apache.org/repos/asf/spark/blob/339441f5/core/src/main/scala/org/apache/spark/ui/UIUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 1b10425..9cb50d9 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -25,6 +25,7 @@ import org.apache.spark.Logging /** Utility functions for generating XML pages with spark content. */ private[spark] object UIUtils extends Logging { + val TABLE_CLASS = "table table-bordered table-striped table-condensed sortable" // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use. private val dateFormat = new ThreadLocal[SimpleDateFormat]() { @@ -139,6 +140,18 @@ private[spark] object UIUtils extends Logging { def prependBaseUri(basePath: String = "", resource: String = "") = uiRoot + basePath + resource + val commonHeaderNodes = { + <meta http-equiv="Content-type" content="text/html; charset=utf-8" /> + <link rel="stylesheet" href={prependBaseUri("/static/bootstrap.min.css")} + type="text/css" /> + <link rel="stylesheet" href={prependBaseUri("/static/webui.css")} + type="text/css" /> + <script src={prependBaseUri("/static/sorttable.js")} ></script> + <script src={prependBaseUri("/static/jquery-1.11.1.min.js")}></script> + <script src={prependBaseUri("/static/bootstrap-tooltip.js")}></script> + <script src={prependBaseUri("/static/initialize-tooltips.js")}></script> + } + /** Returns a spark page with correctly formatted headers */ def headerSparkPage( content: => Seq[Node], @@ -157,12 +170,7 @@ private[spark] object UIUtils extends Logging { <html> <head> - <meta http-equiv="Content-type" content="text/html; charset=utf-8" /> - <link rel="stylesheet" href={prependBaseUri("/static/bootstrap.min.css")} - type="text/css" /> - <link rel="stylesheet" href={prependBaseUri("/static/webui.css")} - type="text/css" /> - <script src={prependBaseUri("/static/sorttable.js")} ></script> + {commonHeaderNodes} <title>{appName} - {title}</title> </head> <body> @@ -193,11 +201,7 @@ private[spark] object UIUtils extends Logging { def basicSparkPage(content: => Seq[Node], title: String): Seq[Node] = { <html> <head> - <meta http-equiv="Content-type" content="text/html; charset=utf-8" /> - <link rel="stylesheet" href={prependBaseUri("/static/bootstrap.min.css")} - type="text/css" /> - <link rel="stylesheet" href={prependBaseUri("/static/webui.css")} type="text/css" /> - <script src={prependBaseUri("/static/sorttable.js")} ></script> + {commonHeaderNodes} <title>{title}</title> </head> <body> @@ -224,9 +228,9 @@ private[spark] object UIUtils extends Logging { data: Seq[T], fixedWidth: Boolean = false): Seq[Node] = { - var tableClass = "table table-bordered table-striped table-condensed sortable" + var listingTableClass = TABLE_CLASS if (fixedWidth) { - tableClass += " table-fixed" + listingTableClass += " table-fixed" } val colWidth = 100.toDouble / headers.size val colWidthAttr = if (fixedWidth) colWidth + "%" else "" @@ -246,7 +250,7 @@ private[spark] object UIUtils extends Logging { } } } - <table class={tableClass}> + <table class={listingTableClass}> <thead>{headerRow}</thead> <tbody> {data.map(r => generateDataRow(r))} http://git-wip-us.apache.org/repos/asf/spark/blob/339441f5/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 95b4a4e..b358c85 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -22,9 +22,26 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.storage.StorageLevel -import org.apache.spark.ui.{WebUIPage, UIUtils} +import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} import org.apache.spark.util.Utils +/** Summary information about an executor to display in the UI. */ +private case class ExecutorSummaryInfo( + id: String, + hostPort: String, + rddBlocks: Int, + memoryUsed: Long, + diskUsed: Long, + activeTasks: Int, + failedTasks: Int, + completedTasks: Int, + totalTasks: Int, + totalDuration: Long, + totalInputBytes: Long, + totalShuffleRead: Long, + totalShuffleWrite: Long, + maxMemory: Long) + private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { private val appName = parent.appName private val basePath = parent.basePath @@ -36,8 +53,36 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { val memUsed = storageStatusList.map(_.memUsed).fold(0L)(_ + _) val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_ + _) val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId) - val execInfoSorted = execInfo.sortBy(_.getOrElse("Executor ID", "")) - val execTable = UIUtils.listingTable(execHeader, execRow, execInfoSorted) + val execInfoSorted = execInfo.sortBy(_.id) + + val execTable = + <table class={UIUtils.TABLE_CLASS}> + <thead> + <th>Executor ID</th> + <th>Address</th> + <th>RDD Blocks</th> + <th>Memory Used</th> + <th>Disk Used</th> + <th>Active Tasks</th> + <th>Failed Tasks</th> + <th>Complete Tasks</th> + <th>Total Tasks</th> + <th>Task Time</th> + <th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th> + <th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle Read</span></th> + <th> + <!-- Place the shuffle write tooltip on the left (rather than the default position + of on top) because the shuffle write column is the last column on the right side and + the tooltip is wider than the column, so it doesn't fit on top. --> + <span data-toggle="tooltip" data-placement="left" title={ToolTips.SHUFFLE_WRITE}> + Shuffle Write + </span> + </th> + </thead> + <tbody> + {execInfoSorted.map(execRow(_))} + </tbody> + </table> val content = <div class="row-fluid"> @@ -60,53 +105,43 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { parent.headerTabs, parent) } - /** Header fields for the executors table */ - private def execHeader = Seq( - "Executor ID", - "Address", - "RDD Blocks", - "Memory Used", - "Disk Used", - "Active Tasks", - "Failed Tasks", - "Complete Tasks", - "Total Tasks", - "Task Time", - "Input Bytes", - "Shuffle Read", - "Shuffle Write") - /** Render an HTML row representing an executor */ - private def execRow(values: Map[String, String]): Seq[Node] = { - val maximumMemory = values("Maximum Memory") - val memoryUsed = values("Memory Used") - val diskUsed = values("Disk Used") - // scalastyle:off + private def execRow(info: ExecutorSummaryInfo): Seq[Node] = { + val maximumMemory = info.maxMemory + val memoryUsed = info.memoryUsed + val diskUsed = info.diskUsed <tr> - <td>{values("Executor ID")}</td> - <td>{values("Address")}</td> - <td>{values("RDD Blocks")}</td> - <td sorttable_customkey={memoryUsed}> - {Utils.bytesToString(memoryUsed.toLong)} / - {Utils.bytesToString(maximumMemory.toLong)} + <td>{info.id}</td> + <td>{info.hostPort}</td> + <td>{info.rddBlocks}</td> + <td sorttable_customkey={memoryUsed.toString}> + {Utils.bytesToString(memoryUsed)} / + {Utils.bytesToString(maximumMemory)} + </td> + <td sorttable_customkey={diskUsed.toString}> + {Utils.bytesToString(diskUsed)} + </td> + <td>{info.activeTasks}</td> + <td>{info.failedTasks}</td> + <td>{info.completedTasks}</td> + <td>{info.totalTasks}</td> + <td sorttable_customkey={info.totalDuration.toString}> + {Utils.msDurationToString(info.totalDuration)} </td> - <td sorttable_customkey={diskUsed}> - {Utils.bytesToString(diskUsed.toLong)} + <td sorttable_customkey={info.totalInputBytes.toString}> + {Utils.bytesToString(info.totalInputBytes)} + </td> + <td sorttable_customkey={info.totalShuffleRead.toString}> + {Utils.bytesToString(info.totalShuffleRead)} + </td> + <td sorttable_customkey={info.totalShuffleWrite.toString}> + {Utils.bytesToString(info.totalShuffleWrite)} </td> - <td>{values("Active Tasks")}</td> - <td>{values("Failed Tasks")}</td> - <td>{values("Complete Tasks")}</td> - <td>{values("Total Tasks")}</td> - <td sorttable_customkey={values("Task Time")}>{Utils.msDurationToString(values("Task Time").toLong)}</td> - <td sorttable_customkey={values("Input Bytes")}>{Utils.bytesToString(values("Input Bytes").toLong)}</td> - <td sorttable_customkey={values("Shuffle Read")}>{Utils.bytesToString(values("Shuffle Read").toLong)}</td> - <td sorttable_customkey={values("Shuffle Write")} >{Utils.bytesToString(values("Shuffle Write").toLong)}</td> </tr> - // scalastyle:on } /** Represent an executor's info as a map given a storage status index */ - private def getExecInfo(statusId: Int): Map[String, String] = { + private def getExecInfo(statusId: Int): ExecutorSummaryInfo = { val status = listener.storageStatusList(statusId) val execId = status.blockManagerId.executorId val hostPort = status.blockManagerId.hostPort @@ -118,15 +153,12 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0) val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0) val totalTasks = activeTasks + failedTasks + completedTasks - val totalDuration = listener.executorToDuration.getOrElse(execId, 0) - val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0) - val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0) - val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0) + val totalDuration = listener.executorToDuration.getOrElse(execId, 0L) + val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0L) + val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L) + val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L) - // Also include fields not in the header - val execFields = execHeader ++ Seq("Maximum Memory") - - val execValues = Seq( + new ExecutorSummaryInfo( execId, hostPort, rddBlocks, @@ -141,8 +173,6 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { totalShuffleRead, totalShuffleWrite, maxMem - ).map(_.toString) - - execFields.zip(execValues).toMap + ) } } http://git-wip-us.apache.org/repos/asf/spark/blob/339441f5/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 2a34a9a..5202095 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -20,7 +20,7 @@ package org.apache.spark.ui.jobs import scala.collection.mutable import scala.xml.Node -import org.apache.spark.ui.UIUtils +import org.apache.spark.ui.{ToolTips, UIUtils} import org.apache.spark.util.Utils /** Page showing executor summary */ @@ -35,7 +35,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) { /** Special table which merges two header cells. */ private def executorTable[T](): Seq[Node] = { - <table class="table table-bordered table-striped table-condensed sortable"> + <table class={UIUtils.TABLE_CLASS}> <thead> <th>Executor ID</th> <th>Address</th> @@ -43,9 +43,9 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) { <th>Total Tasks</th> <th>Failed Tasks</th> <th>Succeeded Tasks</th> - <th>Input Bytes</th> - <th>Shuffle Read</th> - <th>Shuffle Write</th> + <th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th> + <th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle Read</span></th> + <th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_WRITE}>Shuffle Write</span></th> <th>Shuffle Spill (Memory)</th> <th>Shuffle Spill (Disk)</th> </thead> http://git-wip-us.apache.org/repos/asf/spark/blob/339441f5/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index afb8ed7..8c3821b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -22,7 +22,7 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.ui.{WebUIPage, UIUtils} +import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils} import org.apache.spark.util.{Utils, Distribution} /** Page showing statistics and task list for a given stage */ @@ -127,14 +127,14 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { metrics.get.resultSerializationTime.toDouble } val serializationQuantiles = - "Result serialization time" +: Distribution(serializationTimes). - get.getQuantiles().map(ms => UIUtils.formatDuration(ms.toLong)) + <td>Result serialization time</td> +: Distribution(serializationTimes). + get.getQuantiles().map(ms => <td>{UIUtils.formatDuration(ms.toLong)}</td>) val serviceTimes = validTasks.map { case TaskUIData(_, metrics, _) => metrics.get.executorRunTime.toDouble } - val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles() - .map(ms => UIUtils.formatDuration(ms.toLong)) + val serviceQuantiles = <td>Duration</td> +: Distribution(serviceTimes).get.getQuantiles() + .map(ms => <td>{UIUtils.formatDuration(ms.toLong)}</td>) val gettingResultTimes = validTasks.map { case TaskUIData(info, _, _) => if (info.gettingResultTime > 0) { @@ -143,9 +143,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { 0.0 } } - val gettingResultQuantiles = "Time spent fetching task results" +: + val gettingResultQuantiles = <td>Time spent fetching task results</td> +: Distribution(gettingResultTimes).get.getQuantiles().map { millis => - UIUtils.formatDuration(millis.toLong) + <td>{UIUtils.formatDuration(millis.toLong)}</td> } // The scheduler delay includes the network delay to send the task to the worker // machine and to send back the result (but not the time to fetch the task result, @@ -160,42 +160,45 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { } totalExecutionTime - metrics.get.executorRunTime } - val schedulerDelayQuantiles = "Scheduler delay" +: + val schedulerDelayTitle = <td><span data-toggle="tooltip" + title={ToolTips.SCHEDULER_DELAY} data-placement="right">Scheduler delay</span></td> + val schedulerDelayQuantiles = schedulerDelayTitle +: Distribution(schedulerDelays).get.getQuantiles().map { millis => - UIUtils.formatDuration(millis.toLong) + <td>{UIUtils.formatDuration(millis.toLong)}</td> } def getQuantileCols(data: Seq[Double]) = - Distribution(data).get.getQuantiles().map(d => Utils.bytesToString(d.toLong)) + Distribution(data).get.getQuantiles().map(d => <td>{Utils.bytesToString(d.toLong)}</td>) val inputSizes = validTasks.map { case TaskUIData(_, metrics, _) => metrics.get.inputMetrics.map(_.bytesRead).getOrElse(0L).toDouble } - val inputQuantiles = "Input" +: getQuantileCols(inputSizes) + val inputQuantiles = <td>Input</td> +: getQuantileCols(inputSizes) val shuffleReadSizes = validTasks.map { case TaskUIData(_, metrics, _) => metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble } - val shuffleReadQuantiles = "Shuffle Read (Remote)" +: getQuantileCols(shuffleReadSizes) + val shuffleReadQuantiles = <td>Shuffle Read (Remote)</td> +: + getQuantileCols(shuffleReadSizes) val shuffleWriteSizes = validTasks.map { case TaskUIData(_, metrics, _) => metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble } - val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes) + val shuffleWriteQuantiles = <td>Shuffle Write</td> +: getQuantileCols(shuffleWriteSizes) val memoryBytesSpilledSizes = validTasks.map { case TaskUIData(_, metrics, _) => metrics.get.memoryBytesSpilled.toDouble } - val memoryBytesSpilledQuantiles = "Shuffle spill (memory)" +: + val memoryBytesSpilledQuantiles = <td>Shuffle spill (memory)</td> +: getQuantileCols(memoryBytesSpilledSizes) val diskBytesSpilledSizes = validTasks.map { case TaskUIData(_, metrics, _) => metrics.get.diskBytesSpilled.toDouble } - val diskBytesSpilledQuantiles = "Shuffle spill (disk)" +: + val diskBytesSpilledQuantiles = <td>Shuffle spill (disk)</td> +: getQuantileCols(diskBytesSpilledSizes) - val listings: Seq[Seq[String]] = Seq( + val listings: Seq[Seq[Node]] = Seq( serializationQuantiles, serviceQuantiles, gettingResultQuantiles, @@ -208,7 +211,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { val quantileHeaders = Seq("Metric", "Min", "25th percentile", "Median", "75th percentile", "Max") - def quantileRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr> + def quantileRow(data: Seq[Node]): Seq[Node] = <tr>{data}</tr> Some(UIUtils.listingTable(quantileHeaders, quantileRow, listings, fixedWidth = true)) } val executorTable = new ExecutorTable(stageId, parent) http://git-wip-us.apache.org/repos/asf/spark/blob/339441f5/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index a9ac6d5..4013c6f 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -23,7 +23,7 @@ import scala.collection.mutable.HashMap import scala.xml.Node import org.apache.spark.scheduler.{StageInfo, TaskInfo} -import org.apache.spark.ui.UIUtils +import org.apache.spark.ui.{ToolTips, UIUtils} import org.apache.spark.util.Utils /** Page showing list of all ongoing and recently finished stages */ @@ -43,9 +43,16 @@ private[ui] class StageTableBase( <th>Submitted</th> <th>Duration</th> <th>Tasks: Succeeded/Total</th> - <th>Input</th> - <th>Shuffle Read</th> - <th>Shuffle Write</th> + <th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th> + <th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle Read</span></th> + <th> + <!-- Place the shuffle write tooltip on the left (rather than the default position + of on top) because the shuffle write column is the last column on the right side and + the tooltip is wider than the column, so it doesn't fit on top. --> + <span data-toggle="tooltip" data-placement="left" title={ToolTips.SHUFFLE_WRITE}> + Shuffle Write + </span> + </th> } def toNodeSeq: Seq[Node] = {
