Updated Branches:
  refs/heads/master 87676a6af -> 3fb302c08

Added logging of scheduler delays to UI


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/fc78f67d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/fc78f67d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/fc78f67d

Branch: refs/heads/master
Commit: fc78f67da2fd28744e8119e28f4bb8a29926b3ad
Parents: 2fead51
Author: Kay Ousterhout <[email protected]>
Authored: Thu Nov 21 16:54:23 2013 -0800
Committer: Kay Ousterhout <[email protected]>
Committed: Thu Nov 21 16:54:23 2013 -0800

----------------------------------------------------------------------
 .../org/apache/spark/ui/jobs/StagePage.scala    | 33 ++++++++++++++++++--
 1 file changed, 31 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/fc78f67d/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index fbd8228..fc8c334 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -60,11 +60,13 @@ private[spark] class StagePage(parent: JobProgressUI) {
       var activeTime = 0L
       listener.stageIdToTasksActive(stageId).foreach(activeTime += 
_.timeRunning(now))
 
+      val finishedTasks = 
listener.stageIdToTaskInfos(stageId).filter(_._1.finished)
+
       val summary =
         <div>
           <ul class="unstyled">
             <li>
-              <strong>CPU time: </strong>
+              <strong>Total duration across all tasks: </strong>
               {parent.formatDuration(listener.stageIdToTime.getOrElse(stageId, 
0L) + activeTime)}
             </li>
             {if (hasShuffleRead)
@@ -104,6 +106,30 @@ private[spark] class StagePage(parent: JobProgressUI) {
           val serviceQuantiles = "Duration" +: 
Distribution(serviceTimes).get.getQuantiles().map(
             ms => parent.formatDuration(ms.toLong))
 
+          val gettingResultTimes = validTasks.map{case (info, metrics, 
exception) =>
+            if (info.gettingResultTime > 0) {
+              (info.finishTime - info.gettingResultTime).toDouble
+            } else {
+              0.0
+            }
+          }
+          val gettingResultQuantiles = ("Time spent fetching task results" +:
+            Distribution(gettingResultTimes).get.getQuantiles().map(
+              millis => parent.formatDuration(millis.toLong)))
+          // The scheduler delay includes the network delay to send the task 
to the worker
+          // machine and to send back the result (but not the time to fetch 
the task result,
+          // if it needed to be fetched from the block manager on the worker).
+          val schedulerDelays = validTasks.map{case (info, metrics, exception) 
=>
+            if (info.gettingResultTime > 0) {
+              (info.gettingResultTime - info.launchTime).toDouble
+            } else {
+              (info.finishTime - info.launchTime).toDouble
+            }
+          }
+          val schedulerDelayQuantiles = ("Scheduler delay" +:
+            Distribution(schedulerDelays).get.getQuantiles().map(
+              millis => parent.formatDuration(millis.toLong)))
+
           def getQuantileCols(data: Seq[Double]) =
             Distribution(data).get.getQuantiles().map(d => 
Utils.bytesToString(d.toLong))
 
@@ -119,7 +145,10 @@ private[spark] class StagePage(parent: JobProgressUI) {
           }
           val shuffleWriteQuantiles = "Shuffle Write" +: 
getQuantileCols(shuffleWriteSizes)
 
-          val listings: Seq[Seq[String]] = Seq(serviceQuantiles,
+          val listings: Seq[Seq[String]] = Seq(
+            serviceQuantiles,
+            gettingResultQuantiles,
+            schedulerDelayQuantiles,
             if (hasShuffleRead) shuffleReadQuantiles else Nil,
             if (hasShuffleWrite) shuffleWriteQuantiles else Nil)
 

Reply via email to