Repository: spark
Updated Branches:
  refs/heads/master 90b46e014 -> 8028a2888


http://git-wip-us.apache.org/repos/asf/spark/blob/8028a288/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index b854d74..5ba67af 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -266,18 +266,13 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
       taskInfoMetrics.foreach { case (taskInfo, taskMetrics) =>
         taskMetrics.resultSize should be > (0L)
         if (stageInfo.rddInfos.exists(info => info.name == d2.name || 
info.name == d3.name)) {
-          taskMetrics.inputMetrics should not be ('defined)
-          taskMetrics.outputMetrics should not be ('defined)
-          taskMetrics.shuffleWriteMetrics should be ('defined)
-          taskMetrics.shuffleWriteMetrics.get.bytesWritten should be > (0L)
+          assert(taskMetrics.shuffleWriteMetrics.bytesWritten > 0L)
         }
         if (stageInfo.rddInfos.exists(_.name == d4.name)) {
-          taskMetrics.shuffleReadMetrics should be ('defined)
-          val sm = taskMetrics.shuffleReadMetrics.get
-          sm.totalBlocksFetched should be (2*numSlices)
-          sm.localBlocksFetched should be (2*numSlices)
-          sm.remoteBlocksFetched should be (0)
-          sm.remoteBytesRead should be (0L)
+          assert(taskMetrics.shuffleReadMetrics.totalBlocksFetched == 2 * 
numSlices)
+          assert(taskMetrics.shuffleReadMetrics.localBlocksFetched == 2 * 
numSlices)
+          assert(taskMetrics.shuffleReadMetrics.remoteBlocksFetched == 0)
+          assert(taskMetrics.shuffleReadMetrics.remoteBytesRead == 0L)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/8028a288/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
index 16418f8..5132384 100644
--- 
a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala
@@ -144,7 +144,7 @@ class BypassMergeSortShuffleWriterSuite extends 
SparkFunSuite with BeforeAndAfte
     assert(outputFile.exists())
     assert(outputFile.length() === 0)
     assert(temporaryFilesCreated.isEmpty)
-    val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics.get
+    val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics
     assert(shuffleWriteMetrics.bytesWritten === 0)
     assert(shuffleWriteMetrics.recordsWritten === 0)
     assert(taskMetrics.diskBytesSpilled === 0)
@@ -168,7 +168,7 @@ class BypassMergeSortShuffleWriterSuite extends 
SparkFunSuite with BeforeAndAfte
     assert(writer.getPartitionLengths.sum === outputFile.length())
     assert(writer.getPartitionLengths.count(_ == 0L) === 4) // should be 4 
zero length files
     assert(temporaryFilesCreated.count(_.exists()) === 0) // check that 
temporary files were deleted
-    val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics.get
+    val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics
     assert(shuffleWriteMetrics.bytesWritten === outputFile.length())
     assert(shuffleWriteMetrics.recordsWritten === records.length)
     assert(taskMetrics.diskBytesSpilled === 0)

http://git-wip-us.apache.org/repos/asf/spark/blob/8028a288/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 7d4c086..85c877e 100644
--- 
a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -184,7 +184,7 @@ class JobProgressListenerSuite extends SparkFunSuite with 
LocalSparkContext with
     val conf = new SparkConf()
     val listener = new JobProgressListener(conf)
     val taskMetrics = new TaskMetrics()
-    val shuffleReadMetrics = taskMetrics.registerTempShuffleReadMetrics()
+    val shuffleReadMetrics = taskMetrics.createTempShuffleReadMetrics()
     assert(listener.stageIdToData.size === 0)
 
     // finish this task, should get updated shuffleRead
@@ -272,10 +272,10 @@ class JobProgressListenerSuite extends SparkFunSuite with 
LocalSparkContext with
       val accums = InternalAccumulator.createAll()
       accums.foreach(Accumulators.register)
       val taskMetrics = new TaskMetrics(accums)
-      val shuffleReadMetrics = taskMetrics.registerTempShuffleReadMetrics()
-      val shuffleWriteMetrics = taskMetrics.registerShuffleWriteMetrics()
-      val inputMetrics = 
taskMetrics.registerInputMetrics(DataReadMethod.Hadoop)
-      val outputMetrics = 
taskMetrics.registerOutputMetrics(DataWriteMethod.Hadoop)
+      val shuffleReadMetrics = taskMetrics.createTempShuffleReadMetrics()
+      val shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics
+      val inputMetrics = taskMetrics.inputMetrics
+      val outputMetrics = taskMetrics.outputMetrics
       shuffleReadMetrics.incRemoteBytesRead(base + 1)
       shuffleReadMetrics.incLocalBytesRead(base + 9)
       shuffleReadMetrics.incRemoteBlocksFetched(base + 2)
@@ -322,12 +322,13 @@ class JobProgressListenerSuite extends SparkFunSuite with 
LocalSparkContext with
     assert(stage1Data.inputBytes == 207)
     assert(stage0Data.outputBytes == 116)
     assert(stage1Data.outputBytes == 208)
-    
assert(stage0Data.taskData.get(1234L).get.metrics.get.shuffleReadMetrics.get
-      .totalBlocksFetched == 2)
-    
assert(stage0Data.taskData.get(1235L).get.metrics.get.shuffleReadMetrics.get
-      .totalBlocksFetched == 102)
-    
assert(stage1Data.taskData.get(1236L).get.metrics.get.shuffleReadMetrics.get
-      .totalBlocksFetched == 202)
+
+    assert(
+      
stage0Data.taskData.get(1234L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched
 == 2)
+    assert(
+      
stage0Data.taskData.get(1235L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched
 == 102)
+    assert(
+      
stage1Data.taskData.get(1236L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched
 == 202)
 
     // task that was included in a heartbeat
     listener.onTaskEnd(SparkListenerTaskEnd(0, 0, taskType, Success, 
makeTaskInfo(1234L, 1),
@@ -355,9 +356,9 @@ class JobProgressListenerSuite extends SparkFunSuite with 
LocalSparkContext with
     assert(stage1Data.inputBytes == 614)
     assert(stage0Data.outputBytes == 416)
     assert(stage1Data.outputBytes == 616)
-    
assert(stage0Data.taskData.get(1234L).get.metrics.get.shuffleReadMetrics.get
-      .totalBlocksFetched == 302)
-    
assert(stage1Data.taskData.get(1237L).get.metrics.get.shuffleReadMetrics.get
-      .totalBlocksFetched == 402)
+    assert(
+      
stage0Data.taskData.get(1234L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched
 == 302)
+    assert(
+      
stage1Data.taskData.get(1237L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched
 == 402)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8028a288/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index de6f408..612c7c1 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -197,49 +197,41 @@ class JsonProtocolSuite extends SparkFunSuite {
   test("InputMetrics backward compatibility") {
     // InputMetrics were added after 1.0.1.
     val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true, 
hasOutput = false)
-    assert(metrics.inputMetrics.nonEmpty)
     val newJson = JsonProtocol.taskMetricsToJson(metrics)
     val oldJson = newJson.removeField { case (field, _) => field == "Input 
Metrics" }
     val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
-    assert(newMetrics.inputMetrics.isEmpty)
   }
 
   test("Input/Output records backwards compatibility") {
     // records read were added after 1.2
     val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6,
       hasHadoopInput = true, hasOutput = true, hasRecords = false)
-    assert(metrics.inputMetrics.nonEmpty)
-    assert(metrics.outputMetrics.nonEmpty)
     val newJson = JsonProtocol.taskMetricsToJson(metrics)
     val oldJson = newJson.removeField { case (field, _) => field == "Records 
Read" }
                          .removeField { case (field, _) => field == "Records 
Written" }
     val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
-    assert(newMetrics.inputMetrics.get.recordsRead == 0)
-    assert(newMetrics.outputMetrics.get.recordsWritten == 0)
+    assert(newMetrics.inputMetrics.recordsRead == 0)
+    assert(newMetrics.outputMetrics.recordsWritten == 0)
   }
 
   test("Shuffle Read/Write records backwards compatibility") {
     // records read were added after 1.2
     val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6,
       hasHadoopInput = false, hasOutput = false, hasRecords = false)
-    assert(metrics.shuffleReadMetrics.nonEmpty)
-    assert(metrics.shuffleWriteMetrics.nonEmpty)
     val newJson = JsonProtocol.taskMetricsToJson(metrics)
     val oldJson = newJson.removeField { case (field, _) => field == "Total 
Records Read" }
                          .removeField { case (field, _) => field == "Shuffle 
Records Written" }
     val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
-    assert(newMetrics.shuffleReadMetrics.get.recordsRead == 0)
-    assert(newMetrics.shuffleWriteMetrics.get.recordsWritten == 0)
+    assert(newMetrics.shuffleReadMetrics.recordsRead == 0)
+    assert(newMetrics.shuffleWriteMetrics.recordsWritten == 0)
   }
 
   test("OutputMetrics backward compatibility") {
     // OutputMetrics were added after 1.1
     val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = 
false, hasOutput = true)
-    assert(metrics.outputMetrics.nonEmpty)
     val newJson = JsonProtocol.taskMetricsToJson(metrics)
     val oldJson = newJson.removeField { case (field, _) => field == "Output 
Metrics" }
     val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
-    assert(newMetrics.outputMetrics.isEmpty)
   }
 
   test("BlockManager events backward compatibility") {
@@ -279,11 +271,10 @@ class JsonProtocolSuite extends SparkFunSuite {
     // Metrics about local shuffle bytes read were added in 1.3.1.
     val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6,
       hasHadoopInput = false, hasOutput = false, hasRecords = false)
-    assert(metrics.shuffleReadMetrics.nonEmpty)
     val newJson = JsonProtocol.taskMetricsToJson(metrics)
     val oldJson = newJson.removeField { case (field, _) => field == "Local 
Bytes Read" }
     val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
-    assert(newMetrics.shuffleReadMetrics.get.localBytesRead == 0)
+    assert(newMetrics.shuffleReadMetrics.localBytesRead == 0)
   }
 
   test("SparkListenerApplicationStart backwards compatibility") {
@@ -423,7 +414,6 @@ class JsonProtocolSuite extends SparkFunSuite {
     })
     testAccumValue(Some(RESULT_SIZE), 3L, JInt(3))
     testAccumValue(Some(shuffleRead.REMOTE_BLOCKS_FETCHED), 2, JInt(2))
-    testAccumValue(Some(input.READ_METHOD), "aka", JString("aka"))
     testAccumValue(Some(UPDATED_BLOCK_STATUSES), blocks, blocksJson)
     // For anything else, we just cast the value to a string
     testAccumValue(Some("anything"), blocks, JString(blocks.toString))
@@ -619,12 +609,9 @@ private[spark] object JsonProtocolSuite extends Assertions 
{
     assert(metrics1.resultSerializationTime === 
metrics2.resultSerializationTime)
     assert(metrics1.memoryBytesSpilled === metrics2.memoryBytesSpilled)
     assert(metrics1.diskBytesSpilled === metrics2.diskBytesSpilled)
-    assertOptionEquals(
-      metrics1.shuffleReadMetrics, metrics2.shuffleReadMetrics, 
assertShuffleReadEquals)
-    assertOptionEquals(
-      metrics1.shuffleWriteMetrics, metrics2.shuffleWriteMetrics, 
assertShuffleWriteEquals)
-    assertOptionEquals(
-      metrics1.inputMetrics, metrics2.inputMetrics, assertInputMetricsEquals)
+    assertEquals(metrics1.shuffleReadMetrics, metrics2.shuffleReadMetrics)
+    assertEquals(metrics1.shuffleWriteMetrics, metrics2.shuffleWriteMetrics)
+    assertEquals(metrics1.inputMetrics, metrics2.inputMetrics)
     assertBlocksEquals(metrics1.updatedBlockStatuses, 
metrics2.updatedBlockStatuses)
   }
 
@@ -641,7 +628,6 @@ private[spark] object JsonProtocolSuite extends Assertions {
   }
 
   private def assertEquals(metrics1: InputMetrics, metrics2: InputMetrics) {
-    assert(metrics1.readMethod === metrics2.readMethod)
     assert(metrics1.bytesRead === metrics2.bytesRead)
   }
 
@@ -706,12 +692,13 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
   }
 
   private def assertJsonStringEquals(expected: String, actual: String, 
metadata: String) {
-    val formatJsonString = (json: String) => json.replaceAll("[\\s|]", "")
-    if (formatJsonString(expected) != formatJsonString(actual)) {
+    val expectedJson = pretty(parse(expected))
+    val actualJson = pretty(parse(actual))
+    if (expectedJson != actualJson) {
       // scalastyle:off
       // This prints something useful if the JSON strings don't match
-      println("=== EXPECTED ===\n" + pretty(parse(expected)) + "\n")
-      println("=== ACTUAL ===\n" + pretty(parse(actual)) + "\n")
+      println("=== EXPECTED ===\n" + expectedJson + "\n")
+      println("=== ACTUAL ===\n" + actualJson + "\n")
       // scalastyle:on
       throw new TestFailedException(s"$metadata JSON did not equal", 1)
     }
@@ -740,22 +727,6 @@ private[spark] object JsonProtocolSuite extends Assertions 
{
    * Use different names for methods we pass in to assertSeqEquals or 
assertOptionEquals
    */
 
-  private def assertShuffleReadEquals(r1: ShuffleReadMetrics, r2: 
ShuffleReadMetrics) {
-    assertEquals(r1, r2)
-  }
-
-  private def assertShuffleWriteEquals(w1: ShuffleWriteMetrics, w2: 
ShuffleWriteMetrics) {
-    assertEquals(w1, w2)
-  }
-
-  private def assertInputMetricsEquals(i1: InputMetrics, i2: InputMetrics) {
-    assertEquals(i1, i2)
-  }
-
-  private def assertTaskMetricsEquals(t1: TaskMetrics, t2: TaskMetrics) {
-    assertEquals(t1, t2)
-  }
-
   private def assertBlocksEquals(
       blocks1: Seq[(BlockId, BlockStatus)],
       blocks2: Seq[(BlockId, BlockStatus)]) = {
@@ -851,11 +822,11 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
     t.incMemoryBytesSpilled(a + c)
 
     if (hasHadoopInput) {
-      val inputMetrics = t.registerInputMetrics(DataReadMethod.Hadoop)
+      val inputMetrics = t.inputMetrics
       inputMetrics.setBytesRead(d + e + f)
       inputMetrics.incRecordsRead(if (hasRecords) (d + e + f) / 100 else -1)
     } else {
-      val sr = t.registerTempShuffleReadMetrics()
+      val sr = t.createTempShuffleReadMetrics()
       sr.incRemoteBytesRead(b + d)
       sr.incLocalBlocksFetched(e)
       sr.incFetchWaitTime(a + d)
@@ -865,11 +836,10 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       t.mergeShuffleReadMetrics()
     }
     if (hasOutput) {
-      val outputMetrics = t.registerOutputMetrics(DataWriteMethod.Hadoop)
-      outputMetrics.setBytesWritten(a + b + c)
-      outputMetrics.setRecordsWritten(if (hasRecords) (a + b + c)/100 else -1)
+      t.outputMetrics.setBytesWritten(a + b + c)
+      t.outputMetrics.setRecordsWritten(if (hasRecords) (a + b + c) / 100 else 
-1)
     } else {
-      val sw = t.registerShuffleWriteMetrics()
+      val sw = t.shuffleWriteMetrics
       sw.incBytesWritten(a + b + c)
       sw.incWriteTime(b + c + d)
       sw.incRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1)
@@ -896,7 +866,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
       |    "Stage Name": "greetings",
       |    "Number of Tasks": 200,
       |    "RDD Info": [],
-      |    "ParentIDs" : [100, 200, 300],
+      |    "Parent IDs" : [100, 200, 300],
       |    "Details": "details",
       |    "Accumulables": [
       |      {
@@ -924,7 +894,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
       |    "Ukraine": "Kiev"
       |  }
       |}
-    """
+    """.stripMargin
 
   private val stageCompletedJsonString =
     """
@@ -953,7 +923,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
       |        "Disk Size": 501
       |      }
       |    ],
-      |    "ParentIDs" : [100, 200, 300],
+      |    "Parent IDs" : [100, 200, 300],
       |    "Details": "details",
       |    "Accumulables": [
       |      {
@@ -975,7 +945,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
       |    ]
       |  }
       |}
-    """
+    """.stripMargin
 
   private val taskStartJsonString =
     """
@@ -1223,7 +1193,6 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |      "Shuffle Records Written": 12
       |    },
       |    "Input Metrics": {
-      |      "Data Read Method": "Hadoop",
       |      "Bytes Read": 2100,
       |      "Records Read": 21
       |    },
@@ -1244,7 +1213,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |    ]
       |  }
       |}
-    """
+    """.stripMargin
 
   private val taskEndWithOutputJsonString =
     """
@@ -1304,12 +1273,10 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |    "Memory Bytes Spilled": 800,
       |    "Disk Bytes Spilled": 0,
       |    "Input Metrics": {
-      |      "Data Read Method": "Hadoop",
       |      "Bytes Read": 2100,
       |      "Records Read": 21
       |    },
       |    "Output Metrics": {
-      |      "Data Write Method": "Hadoop",
       |      "Bytes Written": 1200,
       |      "Records Written": 12
       |    },
@@ -1330,7 +1297,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |    ]
       |  }
       |}
-    """
+    """.stripMargin
 
   private val jobStartJsonString =
     """
@@ -1422,7 +1389,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |          "Disk Size": 1001
       |        }
       |      ],
-      |      "ParentIDs" : [100, 200, 300],
+      |      "Parent IDs" : [100, 200, 300],
       |      "Details": "details",
       |      "Accumulables": [
       |        {
@@ -1498,7 +1465,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |          "Disk Size": 1502
       |        }
       |      ],
-      |      "ParentIDs" : [100, 200, 300],
+      |      "Parent IDs" : [100, 200, 300],
       |      "Details": "details",
       |      "Accumulables": [
       |        {
@@ -1590,7 +1557,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |          "Disk Size": 2003
       |        }
       |      ],
-      |      "ParentIDs" : [100, 200, 300],
+      |      "Parent IDs" : [100, 200, 300],
       |      "Details": "details",
       |      "Accumulables": [
       |        {
@@ -1625,7 +1592,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |    "Ukraine": "Kiev"
       |  }
       |}
-    """
+    """.stripMargin
 
   private val jobEndJsonString =
     """
@@ -1637,7 +1604,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |    "Result": "JobSucceeded"
       |  }
       |}
-    """
+    """.stripMargin
 
   private val environmentUpdateJsonString =
     """
@@ -1658,7 +1625,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |    "Super library": "/tmp/super_library"
       |  }
       |}
-    """
+    """.stripMargin
 
   private val blockManagerAddedJsonString =
     """
@@ -1672,7 +1639,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |  "Maximum Memory": 500,
       |  "Timestamp": 1
       |}
-    """
+    """.stripMargin
 
   private val blockManagerRemovedJsonString =
     """
@@ -1685,7 +1652,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |  },
       |  "Timestamp": 2
       |}
-    """
+    """.stripMargin
 
   private val unpersistRDDJsonString =
     """
@@ -1693,7 +1660,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |  "Event": "SparkListenerUnpersistRDD",
       |  "RDD ID": 12345
       |}
-    """
+    """.stripMargin
 
   private val applicationStartJsonString =
     """
@@ -1705,7 +1672,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |  "User": "Garfield",
       |  "App Attempt ID": "appAttempt"
       |}
-    """
+    """.stripMargin
 
   private val applicationStartJsonWithLogUrlsString =
     """
@@ -1721,7 +1688,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |      "stdout" : "mystdout"
       |  }
       |}
-    """
+    """.stripMargin
 
   private val applicationEndJsonString =
     """
@@ -1729,7 +1696,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |  "Event": "SparkListenerApplicationEnd",
       |  "Timestamp": 42
       |}
-    """
+    """.stripMargin
 
   private val executorAddedJsonString =
     s"""
@@ -1746,7 +1713,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |    }
       |  }
       |}
-    """
+    """.stripMargin
 
   private val executorRemovedJsonString =
     s"""
@@ -1756,7 +1723,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |  "Executor ID": "exec2",
       |  "Removed Reason": "test reason"
       |}
-    """
+    """.stripMargin
 
   private val executorMetricsUpdateJsonString =
     s"""
@@ -1830,16 +1797,16 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |          "Name": "$UPDATED_BLOCK_STATUSES",
       |          "Update": [
       |            {
-      |              "BlockID": "rdd_0_0",
+      |              "Block ID": "rdd_0_0",
       |              "Status": {
-      |                "StorageLevel": {
-      |                  "UseDisk": true,
-      |                  "UseMemory": true,
+      |                "Storage Level": {
+      |                  "Use Disk": true,
+      |                  "Use Memory": true,
       |                  "Deserialized": false,
       |                  "Replication": 2
       |                },
-      |                "MemorySize": 0,
-      |                "DiskSize": 0
+      |                "Memory Size": 0,
+      |                "Disk Size": 0
       |              }
       |            }
       |          ],
@@ -1911,48 +1878,34 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       |        },
       |        {
       |          "ID": 18,
-      |          "Name": "${input.READ_METHOD}",
-      |          "Update": "Hadoop",
-      |          "Internal": true,
-      |          "Count Failed Values": true
-      |        },
-      |        {
-      |          "ID": 19,
       |          "Name": "${input.BYTES_READ}",
       |          "Update": 2100,
       |          "Internal": true,
       |          "Count Failed Values": true
       |        },
       |        {
-      |          "ID": 20,
+      |          "ID": 19,
       |          "Name": "${input.RECORDS_READ}",
       |          "Update": 21,
       |          "Internal": true,
       |          "Count Failed Values": true
       |        },
       |        {
-      |          "ID": 21,
-      |          "Name": "${output.WRITE_METHOD}",
-      |          "Update": "Hadoop",
-      |          "Internal": true,
-      |          "Count Failed Values": true
-      |        },
-      |        {
-      |          "ID": 22,
+      |          "ID": 20,
       |          "Name": "${output.BYTES_WRITTEN}",
       |          "Update": 1200,
       |          "Internal": true,
       |          "Count Failed Values": true
       |        },
       |        {
-      |          "ID": 23,
+      |          "ID": 21,
       |          "Name": "${output.RECORDS_WRITTEN}",
       |          "Update": 12,
       |          "Internal": true,
       |          "Count Failed Values": true
       |        },
       |        {
-      |          "ID": 24,
+      |          "ID": 22,
       |          "Name": "$TEST_ACCUM",
       |          "Update": 0,
       |          "Internal": true,

http://git-wip-us.apache.org/repos/asf/spark/blob/8028a288/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 71f337c..7730823 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -630,7 +630,10 @@ object MimaExcludes {
         
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.getLocalProperty"),
         // [SPARK-14617] Remove deprecated APIs in TaskMetrics
         
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.executor.InputMetrics$"),
-        
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.executor.OutputMetrics$")
+        
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.executor.OutputMetrics$"),
+        // [SPARK-14628] Simplify task metrics by always tracking read/write 
metrics
+        
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.InputMetrics.readMethod"),
+        
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.OutputMetrics.writeMethod")
       )
     case v if v.startsWith("1.6") =>
       Seq(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to