Repository: spark
Updated Branches:
  refs/heads/branch-1.1 a5ae72074 -> 0506539b0


SPARK-2900. aggregate inputBytes per stage

Author: Sandy Ryza <[email protected]>

Closes #1826 from sryza/sandy-spark-2900 and squashes the following commits:

43f9091 [Sandy Ryza] SPARK-2900
(cherry picked from commit df652ea02a3e42d987419308ef14874300347373)

Signed-off-by: Patrick Wendell <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0506539b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0506539b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0506539b

Branch: refs/heads/branch-1.1
Commit: 0506539b0e853d474183078814fb0f550bfbbd67
Parents: a5ae720
Author: Sandy Ryza <[email protected]>
Authored: Sun Aug 17 22:39:06 2014 -0700
Committer: Patrick Wendell <[email protected]>
Committed: Sun Aug 17 22:39:14 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/ui/jobs/JobProgressListener.scala      | 6 ++++++
 .../org/apache/spark/ui/jobs/JobProgressListenerSuite.scala | 9 ++++++++-
 2 files changed, 14 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0506539b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index a3e9566..74cd637 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -200,6 +200,12 @@ class JobProgressListener(conf: SparkConf) extends 
SparkListener with Logging {
     stageData.shuffleReadBytes += shuffleReadDelta
     execSummary.shuffleRead += shuffleReadDelta
 
+    val inputBytesDelta =
+      (taskMetrics.inputMetrics.map(_.bytesRead).getOrElse(0L)
+      - oldMetrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L))
+    stageData.inputBytes += inputBytesDelta
+    execSummary.inputBytes += inputBytesDelta
+
     val diskSpillDelta =
       taskMetrics.diskBytesSpilled - 
oldMetrics.map(_.diskBytesSpilled).getOrElse(0L)
     stageData.diskBytesSpilled += diskSpillDelta

http://git-wip-us.apache.org/repos/asf/spark/blob/0506539b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index f5ba31c..147ec0b 100644
--- 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -22,7 +22,7 @@ import org.scalatest.Matchers
 
 import org.apache.spark._
 import org.apache.spark.{LocalSparkContext, SparkConf, Success}
-import org.apache.spark.executor.{ShuffleWriteMetrics, ShuffleReadMetrics, 
TaskMetrics}
+import org.apache.spark.executor._
 import org.apache.spark.scheduler._
 import org.apache.spark.util.Utils
 
@@ -150,6 +150,9 @@ class JobProgressListenerSuite extends FunSuite with 
LocalSparkContext with Matc
       taskMetrics.executorRunTime = base + 4
       taskMetrics.diskBytesSpilled = base + 5
       taskMetrics.memoryBytesSpilled = base + 6
+      val inputMetrics = new InputMetrics(DataReadMethod.Hadoop)
+      taskMetrics.inputMetrics = Some(inputMetrics)
+      inputMetrics.bytesRead = base + 7
       taskMetrics
     }
 
@@ -182,6 +185,8 @@ class JobProgressListenerSuite extends FunSuite with 
LocalSparkContext with Matc
     assert(stage1Data.diskBytesSpilled == 205)
     assert(stage0Data.memoryBytesSpilled == 112)
     assert(stage1Data.memoryBytesSpilled == 206)
+    assert(stage0Data.inputBytes == 114)
+    assert(stage1Data.inputBytes == 207)
     
assert(stage0Data.taskData.get(1234L).get.taskMetrics.get.shuffleReadMetrics.get
       .totalBlocksFetched == 2)
     
assert(stage0Data.taskData.get(1235L).get.taskMetrics.get.shuffleReadMetrics.get
@@ -208,6 +213,8 @@ class JobProgressListenerSuite extends FunSuite with 
LocalSparkContext with Matc
     assert(stage1Data.diskBytesSpilled == 610)
     assert(stage0Data.memoryBytesSpilled == 412)
     assert(stage1Data.memoryBytesSpilled == 612)
+    assert(stage0Data.inputBytes == 414)
+    assert(stage1Data.inputBytes == 614)
     
assert(stage0Data.taskData.get(1234L).get.taskMetrics.get.shuffleReadMetrics.get
       .totalBlocksFetched == 302)
     
assert(stage1Data.taskData.get(1237L).get.taskMetrics.get.shuffleReadMetrics.get


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to