Repository: spark
Updated Branches:
  refs/heads/master 9f603fce7 -> 838a48036


[SPARK-5982] Remove incorrect Local Read Time Metric

This metric is incomplete, because the files are memory mapped, so much of the 
read from disk occurs later as tasks actually read the file's data.

This should be merged into 1.3, so that we never expose this incorrect metric 
to users.

CC pwendell ksakellis sryza

Author: Kay Ousterhout <[email protected]>

Closes #4749 from kayousterhout/SPARK-5982 and squashes the following commits:

9737b5e [Kay Ousterhout] More fixes
a1eb300 [Kay Ousterhout] Removed one more use of local read time
cf13497 [Kay Ousterhout] [SPARK-5982] Remove incorrectwq Local Read Time Metric


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/838a4803
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/838a4803
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/838a4803

Branch: refs/heads/master
Commit: 838a48036c050cef03b8c3620e16b5495cd7beab
Parents: 9f603fc
Author: Kay Ousterhout <[email protected]>
Authored: Wed Feb 25 14:55:24 2015 -0800
Committer: Kay Ousterhout <[email protected]>
Committed: Wed Feb 25 14:55:24 2015 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/executor/TaskMetrics.scala   | 8 --------
 .../main/scala/org/apache/spark/scheduler/JobLogger.scala    | 1 -
 .../apache/spark/storage/ShuffleBlockFetcherIterator.scala   | 2 --
 core/src/main/scala/org/apache/spark/util/JsonProtocol.scala | 2 --
 .../test/scala/org/apache/spark/util/JsonProtocolSuite.scala | 3 ---
 5 files changed, 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/838a4803/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala 
b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
index df36566..07b1526 100644
--- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
+++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala
@@ -203,7 +203,6 @@ class TaskMetrics extends Serializable {
         merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched)
         merged.incRemoteBytesRead(depMetrics.remoteBytesRead)
         merged.incLocalBytesRead(depMetrics.localBytesRead)
-        merged.incLocalReadTime(depMetrics.localReadTime)
         merged.incRecordsRead(depMetrics.recordsRead)
       }
       _shuffleReadMetrics = Some(merged)
@@ -346,13 +345,6 @@ class ShuffleReadMetrics extends Serializable {
   private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= 
value
 
   /**
-   * Time the task spent (in milliseconds) reading local shuffle blocks (from 
the local disk).
-   */
-  private var _localReadTime: Long = _
-  def localReadTime = _localReadTime
-  private[spark] def incLocalReadTime(value: Long) = _localReadTime += value
-
-  /**
    * Shuffle data that was read from the local disk (as opposed to from a 
remote executor).
    */
   private var _localBytesRead: Long = _

http://git-wip-us.apache.org/repos/asf/spark/blob/838a4803/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala 
b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index f9fc8aa..8aa528a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -170,7 +170,6 @@ class JobLogger(val user: String, val logDirName: String) 
extends SparkListener
         " BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched +
         " REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime +
         " REMOTE_BYTES_READ=" + metrics.remoteBytesRead +
-        " LOCAL_READ_TIME=" + metrics.localReadTime +
         " LOCAL_BYTES_READ=" + metrics.localBytesRead
       case None => ""
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/838a4803/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 2ebb799..8f28ef4 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -228,7 +228,6 @@ final class ShuffleBlockFetcherIterator(
    * track in-memory are the ManagedBuffer references themselves.
    */
   private[this] def fetchLocalBlocks() {
-    val startTime = System.currentTimeMillis
     val iter = localBlocks.iterator
     while (iter.hasNext) {
       val blockId = iter.next()
@@ -246,7 +245,6 @@ final class ShuffleBlockFetcherIterator(
           return
       }
     }
-    shuffleMetrics.incLocalReadTime(System.currentTimeMillis - startTime)
   }
 
   private[this] def initialize(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/838a4803/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 58d37e2..8e20864 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -294,7 +294,6 @@ private[spark] object JsonProtocol {
     ("Local Blocks Fetched" -> shuffleReadMetrics.localBlocksFetched) ~
     ("Fetch Wait Time" -> shuffleReadMetrics.fetchWaitTime) ~
     ("Remote Bytes Read" -> shuffleReadMetrics.remoteBytesRead) ~
-    ("Local Read Time" -> shuffleReadMetrics.localReadTime) ~
     ("Local Bytes Read" -> shuffleReadMetrics.localBytesRead) ~
     ("Total Records Read" -> shuffleReadMetrics.recordsRead)
   }
@@ -676,7 +675,6 @@ private[spark] object JsonProtocol {
     metrics.incLocalBlocksFetched((json \ "Local Blocks Fetched").extract[Int])
     metrics.incFetchWaitTime((json \ "Fetch Wait Time").extract[Long])
     metrics.incRemoteBytesRead((json \ "Remote Bytes Read").extract[Long])
-    metrics.incLocalReadTime((json \ "Local Read 
Time").extractOpt[Long].getOrElse(0))
     metrics.incLocalBytesRead((json \ "Local Bytes 
Read").extractOpt[Long].getOrElse(0))
     metrics.incRecordsRead((json \ "Total Records 
Read").extractOpt[Long].getOrElse(0))
     metrics

http://git-wip-us.apache.org/repos/asf/spark/blob/838a4803/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index c181baf..a2be724 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -270,7 +270,6 @@ class JsonProtocolSuite extends FunSuite {
       .removeField { case (field, _) => field == "Local Read Time" }
     val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
     assert(newMetrics.shuffleReadMetrics.get.localBytesRead == 0)
-    assert(newMetrics.shuffleReadMetrics.get.localReadTime == 0)
   }
 
   test("SparkListenerApplicationStart backwards compatibility") {
@@ -708,7 +707,6 @@ class JsonProtocolSuite extends FunSuite {
       sr.incFetchWaitTime(a + d)
       sr.incRemoteBlocksFetched(f)
       sr.incRecordsRead(if (hasRecords) (b + d) / 100 else -1)
-      sr.incLocalReadTime(a + e)
       sr.incLocalBytesRead(a + f)
       t.setShuffleReadMetrics(Some(sr))
     }
@@ -956,7 +954,6 @@ class JsonProtocolSuite extends FunSuite {
       |      "Local Blocks Fetched": 700,
       |      "Fetch Wait Time": 900,
       |      "Remote Bytes Read": 1000,
-      |      "Local Read Time": 1000,
       |      "Local Bytes Read": 1100,
       |      "Total Records Read" : 10
       |    },


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to