Repository: spark Updated Branches: refs/heads/master 90b46e014 -> 8028a2888
http://git-wip-us.apache.org/repos/asf/spark/blob/8028a288/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index b854d74..5ba67af 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -266,18 +266,13 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match taskInfoMetrics.foreach { case (taskInfo, taskMetrics) => taskMetrics.resultSize should be > (0L) if (stageInfo.rddInfos.exists(info => info.name == d2.name || info.name == d3.name)) { - taskMetrics.inputMetrics should not be ('defined) - taskMetrics.outputMetrics should not be ('defined) - taskMetrics.shuffleWriteMetrics should be ('defined) - taskMetrics.shuffleWriteMetrics.get.bytesWritten should be > (0L) + assert(taskMetrics.shuffleWriteMetrics.bytesWritten > 0L) } if (stageInfo.rddInfos.exists(_.name == d4.name)) { - taskMetrics.shuffleReadMetrics should be ('defined) - val sm = taskMetrics.shuffleReadMetrics.get - sm.totalBlocksFetched should be (2*numSlices) - sm.localBlocksFetched should be (2*numSlices) - sm.remoteBlocksFetched should be (0) - sm.remoteBytesRead should be (0L) + assert(taskMetrics.shuffleReadMetrics.totalBlocksFetched == 2 * numSlices) + assert(taskMetrics.shuffleReadMetrics.localBlocksFetched == 2 * numSlices) + assert(taskMetrics.shuffleReadMetrics.remoteBlocksFetched == 0) + assert(taskMetrics.shuffleReadMetrics.remoteBytesRead == 0L) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/8028a288/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala index 16418f8..5132384 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriterSuite.scala @@ -144,7 +144,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte assert(outputFile.exists()) assert(outputFile.length() === 0) assert(temporaryFilesCreated.isEmpty) - val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics.get + val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics assert(shuffleWriteMetrics.bytesWritten === 0) assert(shuffleWriteMetrics.recordsWritten === 0) assert(taskMetrics.diskBytesSpilled === 0) @@ -168,7 +168,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte assert(writer.getPartitionLengths.sum === outputFile.length()) assert(writer.getPartitionLengths.count(_ == 0L) === 4) // should be 4 zero length files assert(temporaryFilesCreated.count(_.exists()) === 0) // check that temporary files were deleted - val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics.get + val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics assert(shuffleWriteMetrics.bytesWritten === outputFile.length()) assert(shuffleWriteMetrics.recordsWritten === records.length) assert(taskMetrics.diskBytesSpilled === 0) http://git-wip-us.apache.org/repos/asf/spark/blob/8028a288/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 7d4c086..85c877e 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -184,7 +184,7 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with val conf = new SparkConf() val listener = new JobProgressListener(conf) val taskMetrics = new TaskMetrics() - val shuffleReadMetrics = taskMetrics.registerTempShuffleReadMetrics() + val shuffleReadMetrics = taskMetrics.createTempShuffleReadMetrics() assert(listener.stageIdToData.size === 0) // finish this task, should get updated shuffleRead @@ -272,10 +272,10 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with val accums = InternalAccumulator.createAll() accums.foreach(Accumulators.register) val taskMetrics = new TaskMetrics(accums) - val shuffleReadMetrics = taskMetrics.registerTempShuffleReadMetrics() - val shuffleWriteMetrics = taskMetrics.registerShuffleWriteMetrics() - val inputMetrics = taskMetrics.registerInputMetrics(DataReadMethod.Hadoop) - val outputMetrics = taskMetrics.registerOutputMetrics(DataWriteMethod.Hadoop) + val shuffleReadMetrics = taskMetrics.createTempShuffleReadMetrics() + val shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics + val inputMetrics = taskMetrics.inputMetrics + val outputMetrics = taskMetrics.outputMetrics shuffleReadMetrics.incRemoteBytesRead(base + 1) shuffleReadMetrics.incLocalBytesRead(base + 9) shuffleReadMetrics.incRemoteBlocksFetched(base + 2) @@ -322,12 +322,13 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with assert(stage1Data.inputBytes == 207) assert(stage0Data.outputBytes == 116) assert(stage1Data.outputBytes == 208) - assert(stage0Data.taskData.get(1234L).get.metrics.get.shuffleReadMetrics.get - .totalBlocksFetched == 2) - assert(stage0Data.taskData.get(1235L).get.metrics.get.shuffleReadMetrics.get - .totalBlocksFetched == 102) - assert(stage1Data.taskData.get(1236L).get.metrics.get.shuffleReadMetrics.get - .totalBlocksFetched == 202) + + assert( + stage0Data.taskData.get(1234L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched == 2) + assert( + stage0Data.taskData.get(1235L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched == 102) + assert( + stage1Data.taskData.get(1236L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched == 202) // task that was included in a heartbeat listener.onTaskEnd(SparkListenerTaskEnd(0, 0, taskType, Success, makeTaskInfo(1234L, 1), @@ -355,9 +356,9 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with assert(stage1Data.inputBytes == 614) assert(stage0Data.outputBytes == 416) assert(stage1Data.outputBytes == 616) - assert(stage0Data.taskData.get(1234L).get.metrics.get.shuffleReadMetrics.get - .totalBlocksFetched == 302) - assert(stage1Data.taskData.get(1237L).get.metrics.get.shuffleReadMetrics.get - .totalBlocksFetched == 402) + assert( + stage0Data.taskData.get(1234L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched == 302) + assert( + stage1Data.taskData.get(1237L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched == 402) } } http://git-wip-us.apache.org/repos/asf/spark/blob/8028a288/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index de6f408..612c7c1 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -197,49 +197,41 @@ class JsonProtocolSuite extends SparkFunSuite { test("InputMetrics backward compatibility") { // InputMetrics were added after 1.0.1. val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true, hasOutput = false) - assert(metrics.inputMetrics.nonEmpty) val newJson = JsonProtocol.taskMetricsToJson(metrics) val oldJson = newJson.removeField { case (field, _) => field == "Input Metrics" } val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson) - assert(newMetrics.inputMetrics.isEmpty) } test("Input/Output records backwards compatibility") { // records read were added after 1.2 val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true, hasOutput = true, hasRecords = false) - assert(metrics.inputMetrics.nonEmpty) - assert(metrics.outputMetrics.nonEmpty) val newJson = JsonProtocol.taskMetricsToJson(metrics) val oldJson = newJson.removeField { case (field, _) => field == "Records Read" } .removeField { case (field, _) => field == "Records Written" } val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson) - assert(newMetrics.inputMetrics.get.recordsRead == 0) - assert(newMetrics.outputMetrics.get.recordsWritten == 0) + assert(newMetrics.inputMetrics.recordsRead == 0) + assert(newMetrics.outputMetrics.recordsWritten == 0) } test("Shuffle Read/Write records backwards compatibility") { // records read were added after 1.2 val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = false, hasOutput = false, hasRecords = false) - assert(metrics.shuffleReadMetrics.nonEmpty) - assert(metrics.shuffleWriteMetrics.nonEmpty) val newJson = JsonProtocol.taskMetricsToJson(metrics) val oldJson = newJson.removeField { case (field, _) => field == "Total Records Read" } .removeField { case (field, _) => field == "Shuffle Records Written" } val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson) - assert(newMetrics.shuffleReadMetrics.get.recordsRead == 0) - assert(newMetrics.shuffleWriteMetrics.get.recordsWritten == 0) + assert(newMetrics.shuffleReadMetrics.recordsRead == 0) + assert(newMetrics.shuffleWriteMetrics.recordsWritten == 0) } test("OutputMetrics backward compatibility") { // OutputMetrics were added after 1.1 val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = false, hasOutput = true) - assert(metrics.outputMetrics.nonEmpty) val newJson = JsonProtocol.taskMetricsToJson(metrics) val oldJson = newJson.removeField { case (field, _) => field == "Output Metrics" } val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson) - assert(newMetrics.outputMetrics.isEmpty) } test("BlockManager events backward compatibility") { @@ -279,11 +271,10 @@ class JsonProtocolSuite extends SparkFunSuite { // Metrics about local shuffle bytes read were added in 1.3.1. val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = false, hasOutput = false, hasRecords = false) - assert(metrics.shuffleReadMetrics.nonEmpty) val newJson = JsonProtocol.taskMetricsToJson(metrics) val oldJson = newJson.removeField { case (field, _) => field == "Local Bytes Read" } val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson) - assert(newMetrics.shuffleReadMetrics.get.localBytesRead == 0) + assert(newMetrics.shuffleReadMetrics.localBytesRead == 0) } test("SparkListenerApplicationStart backwards compatibility") { @@ -423,7 +414,6 @@ class JsonProtocolSuite extends SparkFunSuite { }) testAccumValue(Some(RESULT_SIZE), 3L, JInt(3)) testAccumValue(Some(shuffleRead.REMOTE_BLOCKS_FETCHED), 2, JInt(2)) - testAccumValue(Some(input.READ_METHOD), "aka", JString("aka")) testAccumValue(Some(UPDATED_BLOCK_STATUSES), blocks, blocksJson) // For anything else, we just cast the value to a string testAccumValue(Some("anything"), blocks, JString(blocks.toString)) @@ -619,12 +609,9 @@ private[spark] object JsonProtocolSuite extends Assertions { assert(metrics1.resultSerializationTime === metrics2.resultSerializationTime) assert(metrics1.memoryBytesSpilled === metrics2.memoryBytesSpilled) assert(metrics1.diskBytesSpilled === metrics2.diskBytesSpilled) - assertOptionEquals( - metrics1.shuffleReadMetrics, metrics2.shuffleReadMetrics, assertShuffleReadEquals) - assertOptionEquals( - metrics1.shuffleWriteMetrics, metrics2.shuffleWriteMetrics, assertShuffleWriteEquals) - assertOptionEquals( - metrics1.inputMetrics, metrics2.inputMetrics, assertInputMetricsEquals) + assertEquals(metrics1.shuffleReadMetrics, metrics2.shuffleReadMetrics) + assertEquals(metrics1.shuffleWriteMetrics, metrics2.shuffleWriteMetrics) + assertEquals(metrics1.inputMetrics, metrics2.inputMetrics) assertBlocksEquals(metrics1.updatedBlockStatuses, metrics2.updatedBlockStatuses) } @@ -641,7 +628,6 @@ private[spark] object JsonProtocolSuite extends Assertions { } private def assertEquals(metrics1: InputMetrics, metrics2: InputMetrics) { - assert(metrics1.readMethod === metrics2.readMethod) assert(metrics1.bytesRead === metrics2.bytesRead) } @@ -706,12 +692,13 @@ private[spark] object JsonProtocolSuite extends Assertions { } private def assertJsonStringEquals(expected: String, actual: String, metadata: String) { - val formatJsonString = (json: String) => json.replaceAll("[\\s|]", "") - if (formatJsonString(expected) != formatJsonString(actual)) { + val expectedJson = pretty(parse(expected)) + val actualJson = pretty(parse(actual)) + if (expectedJson != actualJson) { // scalastyle:off // This prints something useful if the JSON strings don't match - println("=== EXPECTED ===\n" + pretty(parse(expected)) + "\n") - println("=== ACTUAL ===\n" + pretty(parse(actual)) + "\n") + println("=== EXPECTED ===\n" + expectedJson + "\n") + println("=== ACTUAL ===\n" + actualJson + "\n") // scalastyle:on throw new TestFailedException(s"$metadata JSON did not equal", 1) } @@ -740,22 +727,6 @@ private[spark] object JsonProtocolSuite extends Assertions { * Use different names for methods we pass in to assertSeqEquals or assertOptionEquals */ - private def assertShuffleReadEquals(r1: ShuffleReadMetrics, r2: ShuffleReadMetrics) { - assertEquals(r1, r2) - } - - private def assertShuffleWriteEquals(w1: ShuffleWriteMetrics, w2: ShuffleWriteMetrics) { - assertEquals(w1, w2) - } - - private def assertInputMetricsEquals(i1: InputMetrics, i2: InputMetrics) { - assertEquals(i1, i2) - } - - private def assertTaskMetricsEquals(t1: TaskMetrics, t2: TaskMetrics) { - assertEquals(t1, t2) - } - private def assertBlocksEquals( blocks1: Seq[(BlockId, BlockStatus)], blocks2: Seq[(BlockId, BlockStatus)]) = { @@ -851,11 +822,11 @@ private[spark] object JsonProtocolSuite extends Assertions { t.incMemoryBytesSpilled(a + c) if (hasHadoopInput) { - val inputMetrics = t.registerInputMetrics(DataReadMethod.Hadoop) + val inputMetrics = t.inputMetrics inputMetrics.setBytesRead(d + e + f) inputMetrics.incRecordsRead(if (hasRecords) (d + e + f) / 100 else -1) } else { - val sr = t.registerTempShuffleReadMetrics() + val sr = t.createTempShuffleReadMetrics() sr.incRemoteBytesRead(b + d) sr.incLocalBlocksFetched(e) sr.incFetchWaitTime(a + d) @@ -865,11 +836,10 @@ private[spark] object JsonProtocolSuite extends Assertions { t.mergeShuffleReadMetrics() } if (hasOutput) { - val outputMetrics = t.registerOutputMetrics(DataWriteMethod.Hadoop) - outputMetrics.setBytesWritten(a + b + c) - outputMetrics.setRecordsWritten(if (hasRecords) (a + b + c)/100 else -1) + t.outputMetrics.setBytesWritten(a + b + c) + t.outputMetrics.setRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1) } else { - val sw = t.registerShuffleWriteMetrics() + val sw = t.shuffleWriteMetrics sw.incBytesWritten(a + b + c) sw.incWriteTime(b + c + d) sw.incRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1) @@ -896,7 +866,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Stage Name": "greetings", | "Number of Tasks": 200, | "RDD Info": [], - | "ParentIDs" : [100, 200, 300], + | "Parent IDs" : [100, 200, 300], | "Details": "details", | "Accumulables": [ | { @@ -924,7 +894,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Ukraine": "Kiev" | } |} - """ + """.stripMargin private val stageCompletedJsonString = """ @@ -953,7 +923,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Disk Size": 501 | } | ], - | "ParentIDs" : [100, 200, 300], + | "Parent IDs" : [100, 200, 300], | "Details": "details", | "Accumulables": [ | { @@ -975,7 +945,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | ] | } |} - """ + """.stripMargin private val taskStartJsonString = """ @@ -1223,7 +1193,6 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Shuffle Records Written": 12 | }, | "Input Metrics": { - | "Data Read Method": "Hadoop", | "Bytes Read": 2100, | "Records Read": 21 | }, @@ -1244,7 +1213,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | ] | } |} - """ + """.stripMargin private val taskEndWithOutputJsonString = """ @@ -1304,12 +1273,10 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Memory Bytes Spilled": 800, | "Disk Bytes Spilled": 0, | "Input Metrics": { - | "Data Read Method": "Hadoop", | "Bytes Read": 2100, | "Records Read": 21 | }, | "Output Metrics": { - | "Data Write Method": "Hadoop", | "Bytes Written": 1200, | "Records Written": 12 | }, @@ -1330,7 +1297,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | ] | } |} - """ + """.stripMargin private val jobStartJsonString = """ @@ -1422,7 +1389,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Disk Size": 1001 | } | ], - | "ParentIDs" : [100, 200, 300], + | "Parent IDs" : [100, 200, 300], | "Details": "details", | "Accumulables": [ | { @@ -1498,7 +1465,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Disk Size": 1502 | } | ], - | "ParentIDs" : [100, 200, 300], + | "Parent IDs" : [100, 200, 300], | "Details": "details", | "Accumulables": [ | { @@ -1590,7 +1557,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Disk Size": 2003 | } | ], - | "ParentIDs" : [100, 200, 300], + | "Parent IDs" : [100, 200, 300], | "Details": "details", | "Accumulables": [ | { @@ -1625,7 +1592,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Ukraine": "Kiev" | } |} - """ + """.stripMargin private val jobEndJsonString = """ @@ -1637,7 +1604,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Result": "JobSucceeded" | } |} - """ + """.stripMargin private val environmentUpdateJsonString = """ @@ -1658,7 +1625,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Super library": "/tmp/super_library" | } |} - """ + """.stripMargin private val blockManagerAddedJsonString = """ @@ -1672,7 +1639,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Maximum Memory": 500, | "Timestamp": 1 |} - """ + """.stripMargin private val blockManagerRemovedJsonString = """ @@ -1685,7 +1652,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | }, | "Timestamp": 2 |} - """ + """.stripMargin private val unpersistRDDJsonString = """ @@ -1693,7 +1660,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Event": "SparkListenerUnpersistRDD", | "RDD ID": 12345 |} - """ + """.stripMargin private val applicationStartJsonString = """ @@ -1705,7 +1672,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "User": "Garfield", | "App Attempt ID": "appAttempt" |} - """ + """.stripMargin private val applicationStartJsonWithLogUrlsString = """ @@ -1721,7 +1688,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "stdout" : "mystdout" | } |} - """ + """.stripMargin private val applicationEndJsonString = """ @@ -1729,7 +1696,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Event": "SparkListenerApplicationEnd", | "Timestamp": 42 |} - """ + """.stripMargin private val executorAddedJsonString = s""" @@ -1746,7 +1713,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | } | } |} - """ + """.stripMargin private val executorRemovedJsonString = s""" @@ -1756,7 +1723,7 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Executor ID": "exec2", | "Removed Reason": "test reason" |} - """ + """.stripMargin private val executorMetricsUpdateJsonString = s""" @@ -1830,16 +1797,16 @@ private[spark] object JsonProtocolSuite extends Assertions { | "Name": "$UPDATED_BLOCK_STATUSES", | "Update": [ | { - | "BlockID": "rdd_0_0", + | "Block ID": "rdd_0_0", | "Status": { - | "StorageLevel": { - | "UseDisk": true, - | "UseMemory": true, + | "Storage Level": { + | "Use Disk": true, + | "Use Memory": true, | "Deserialized": false, | "Replication": 2 | }, - | "MemorySize": 0, - | "DiskSize": 0 + | "Memory Size": 0, + | "Disk Size": 0 | } | } | ], @@ -1911,48 +1878,34 @@ private[spark] object JsonProtocolSuite extends Assertions { | }, | { | "ID": 18, - | "Name": "${input.READ_METHOD}", - | "Update": "Hadoop", - | "Internal": true, - | "Count Failed Values": true - | }, - | { - | "ID": 19, | "Name": "${input.BYTES_READ}", | "Update": 2100, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 20, + | "ID": 19, | "Name": "${input.RECORDS_READ}", | "Update": 21, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 21, - | "Name": "${output.WRITE_METHOD}", - | "Update": "Hadoop", - | "Internal": true, - | "Count Failed Values": true - | }, - | { - | "ID": 22, + | "ID": 20, | "Name": "${output.BYTES_WRITTEN}", | "Update": 1200, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 23, + | "ID": 21, | "Name": "${output.RECORDS_WRITTEN}", | "Update": 12, | "Internal": true, | "Count Failed Values": true | }, | { - | "ID": 24, + | "ID": 22, | "Name": "$TEST_ACCUM", | "Update": 0, | "Internal": true, http://git-wip-us.apache.org/repos/asf/spark/blob/8028a288/project/MimaExcludes.scala ---------------------------------------------------------------------- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 71f337c..7730823 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -630,7 +630,10 @@ object MimaExcludes { ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.getLocalProperty"), // [SPARK-14617] Remove deprecated APIs in TaskMetrics ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.executor.InputMetrics$"), - ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.executor.OutputMetrics$") + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.executor.OutputMetrics$"), + // [SPARK-14628] Simplify task metrics by always tracking read/write metrics + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.InputMetrics.readMethod"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.executor.OutputMetrics.writeMethod") ) case v if v.startsWith("1.6") => Seq( --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
