Updated Branches: refs/heads/master 87676a6af -> 3fb302c08
Added logging of scheduler delays to UI Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/fc78f67d Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/fc78f67d Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/fc78f67d Branch: refs/heads/master Commit: fc78f67da2fd28744e8119e28f4bb8a29926b3ad Parents: 2fead51 Author: Kay Ousterhout <[email protected]> Authored: Thu Nov 21 16:54:23 2013 -0800 Committer: Kay Ousterhout <[email protected]> Committed: Thu Nov 21 16:54:23 2013 -0800 ---------------------------------------------------------------------- .../org/apache/spark/ui/jobs/StagePage.scala | 33 ++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/fc78f67d/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index fbd8228..fc8c334 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -60,11 +60,13 @@ private[spark] class StagePage(parent: JobProgressUI) { var activeTime = 0L listener.stageIdToTasksActive(stageId).foreach(activeTime += _.timeRunning(now)) + val finishedTasks = listener.stageIdToTaskInfos(stageId).filter(_._1.finished) + val summary = <div> <ul class="unstyled"> <li> - <strong>CPU time: </strong> + <strong>Total duration across all tasks: </strong> {parent.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)} </li> {if (hasShuffleRead) @@ -104,6 +106,30 @@ private[spark] class StagePage(parent: JobProgressUI) { val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map( ms => parent.formatDuration(ms.toLong)) + val gettingResultTimes = validTasks.map{case (info, metrics, exception) => + if (info.gettingResultTime > 0) { + (info.finishTime - info.gettingResultTime).toDouble + } else { + 0.0 + } + } + val gettingResultQuantiles = ("Time spent fetching task results" +: + Distribution(gettingResultTimes).get.getQuantiles().map( + millis => parent.formatDuration(millis.toLong))) + // The scheduler delay includes the network delay to send the task to the worker + // machine and to send back the result (but not the time to fetch the task result, + // if it needed to be fetched from the block manager on the worker). + val schedulerDelays = validTasks.map{case (info, metrics, exception) => + if (info.gettingResultTime > 0) { + (info.gettingResultTime - info.launchTime).toDouble + } else { + (info.finishTime - info.launchTime).toDouble + } + } + val schedulerDelayQuantiles = ("Scheduler delay" +: + Distribution(schedulerDelays).get.getQuantiles().map( + millis => parent.formatDuration(millis.toLong))) + def getQuantileCols(data: Seq[Double]) = Distribution(data).get.getQuantiles().map(d => Utils.bytesToString(d.toLong)) @@ -119,7 +145,10 @@ private[spark] class StagePage(parent: JobProgressUI) { } val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes) - val listings: Seq[Seq[String]] = Seq(serviceQuantiles, + val listings: Seq[Seq[String]] = Seq( + serviceQuantiles, + gettingResultQuantiles, + schedulerDelayQuantiles, if (hasShuffleRead) shuffleReadQuantiles else Nil, if (hasShuffleWrite) shuffleWriteQuantiles else Nil)
