Repository: spark Updated Branches: refs/heads/master dd1c9cb36 -> 4a90276ab
http://git-wip-us.apache.org/repos/asf/spark/blob/4a90276a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 50f4205..0bc9492 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -47,7 +47,12 @@ class JsonProtocolSuite extends FunSuite { val taskEndWithOutput = SparkListenerTaskEnd(1, 0, "ResultTask", Success, makeTaskInfo(123L, 234, 67, 345L, false), makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true, hasOutput = true)) - val jobStart = SparkListenerJobStart(10, Seq[Int](1, 2, 3, 4), properties) + val jobStart = { + val stageIds = Seq[Int](1, 2, 3, 4) + val stageInfos = stageIds.map(x => + makeStageInfo(x, x * 200, x * 300, x * 400L, x * 500L)) + SparkListenerJobStart(10, stageInfos, properties) + } val jobEnd = SparkListenerJobEnd(20, JobSucceeded) val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]]( "JVM Information" -> Seq(("GC speed", "9999 objects/s"), ("Java home", "Land of coffee")), @@ -224,6 +229,19 @@ class JsonProtocolSuite extends FunSuite { assert(expectedExecutorLostFailure === JsonProtocol.taskEndReasonFromJson(oldEvent)) } + test("SparkListenerJobStart backward compatibility") { + // Prior to Spark 1.2.0, SparkListenerJobStart did not have a "Stage Infos" property. + val stageIds = Seq[Int](1, 2, 3, 4) + val stageInfos = stageIds.map(x => makeStageInfo(x, x * 200, x * 300, x * 400, x * 500)) + val dummyStageInfos = + stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, "unknown")) + val jobStart = SparkListenerJobStart(10, stageInfos, properties) + val oldEvent = JsonProtocol.jobStartToJson(jobStart).removeField({_._1 == "Stage Infos"}) + val expectedJobStart = + SparkListenerJobStart(10, dummyStageInfos, properties) + assertEquals(expectedJobStart, JsonProtocol.jobStartFromJson(oldEvent)) + } + /** -------------------------- * | Helper test running methods | * --------------------------- */ @@ -306,7 +324,7 @@ class JsonProtocolSuite extends FunSuite { case (e1: SparkListenerJobStart, e2: SparkListenerJobStart) => assert(e1.jobId === e2.jobId) assert(e1.properties === e2.properties) - assertSeqEquals(e1.stageIds, e2.stageIds, (i1: Int, i2: Int) => assert(i1 === i2)) + assert(e1.stageIds === e2.stageIds) case (e1: SparkListenerJobEnd, e2: SparkListenerJobEnd) => assert(e1.jobId === e2.jobId) assertEquals(e1.jobResult, e2.jobResult) @@ -1051,6 +1069,260 @@ class JsonProtocolSuite extends FunSuite { |{ | "Event": "SparkListenerJobStart", | "Job ID": 10, + | "Stage Infos": [ + | { + | "Stage ID": 1, + | "Stage Attempt ID": 0, + | "Stage Name": "greetings", + | "Number of Tasks": 200, + | "RDD Info": [ + | { + | "RDD ID": 1, + | "Name": "mayor", + | "Storage Level": { + | "Use Disk": true, + | "Use Memory": true, + | "Use Tachyon": false, + | "Deserialized": true, + | "Replication": 1 + | }, + | "Number of Partitions": 200, + | "Number of Cached Partitions": 300, + | "Memory Size": 400, + | "Tachyon Size": 0, + | "Disk Size": 500 + | } + | ], + | "Details": "details", + | "Accumulables": [ + | { + | "ID": 2, + | "Name": " Accumulable 2", + | "Update": "delta2", + | "Value": "val2" + | }, + | { + | "ID": 1, + | "Name": " Accumulable 1", + | "Update": "delta1", + | "Value": "val1" + | } + | ] + | }, + | { + | "Stage ID": 2, + | "Stage Attempt ID": 0, + | "Stage Name": "greetings", + | "Number of Tasks": 400, + | "RDD Info": [ + | { + | "RDD ID": 2, + | "Name": "mayor", + | "Storage Level": { + | "Use Disk": true, + | "Use Memory": true, + | "Use Tachyon": false, + | "Deserialized": true, + | "Replication": 1 + | }, + | "Number of Partitions": 400, + | "Number of Cached Partitions": 600, + | "Memory Size": 800, + | "Tachyon Size": 0, + | "Disk Size": 1000 + | }, + | { + | "RDD ID": 3, + | "Name": "mayor", + | "Storage Level": { + | "Use Disk": true, + | "Use Memory": true, + | "Use Tachyon": false, + | "Deserialized": true, + | "Replication": 1 + | }, + | "Number of Partitions": 401, + | "Number of Cached Partitions": 601, + | "Memory Size": 801, + | "Tachyon Size": 0, + | "Disk Size": 1001 + | } + | ], + | "Details": "details", + | "Accumulables": [ + | { + | "ID": 2, + | "Name": " Accumulable 2", + | "Update": "delta2", + | "Value": "val2" + | }, + | { + | "ID": 1, + | "Name": " Accumulable 1", + | "Update": "delta1", + | "Value": "val1" + | } + | ] + | }, + | { + | "Stage ID": 3, + | "Stage Attempt ID": 0, + | "Stage Name": "greetings", + | "Number of Tasks": 600, + | "RDD Info": [ + | { + | "RDD ID": 3, + | "Name": "mayor", + | "Storage Level": { + | "Use Disk": true, + | "Use Memory": true, + | "Use Tachyon": false, + | "Deserialized": true, + | "Replication": 1 + | }, + | "Number of Partitions": 600, + | "Number of Cached Partitions": 900, + | "Memory Size": 1200, + | "Tachyon Size": 0, + | "Disk Size": 1500 + | }, + | { + | "RDD ID": 4, + | "Name": "mayor", + | "Storage Level": { + | "Use Disk": true, + | "Use Memory": true, + | "Use Tachyon": false, + | "Deserialized": true, + | "Replication": 1 + | }, + | "Number of Partitions": 601, + | "Number of Cached Partitions": 901, + | "Memory Size": 1201, + | "Tachyon Size": 0, + | "Disk Size": 1501 + | }, + | { + | "RDD ID": 5, + | "Name": "mayor", + | "Storage Level": { + | "Use Disk": true, + | "Use Memory": true, + | "Use Tachyon": false, + | "Deserialized": true, + | "Replication": 1 + | }, + | "Number of Partitions": 602, + | "Number of Cached Partitions": 902, + | "Memory Size": 1202, + | "Tachyon Size": 0, + | "Disk Size": 1502 + | } + | ], + | "Details": "details", + | "Accumulables": [ + | { + | "ID": 2, + | "Name": " Accumulable 2", + | "Update": "delta2", + | "Value": "val2" + | }, + | { + | "ID": 1, + | "Name": " Accumulable 1", + | "Update": "delta1", + | "Value": "val1" + | } + | ] + | }, + | { + | "Stage ID": 4, + | "Stage Attempt ID": 0, + | "Stage Name": "greetings", + | "Number of Tasks": 800, + | "RDD Info": [ + | { + | "RDD ID": 4, + | "Name": "mayor", + | "Storage Level": { + | "Use Disk": true, + | "Use Memory": true, + | "Use Tachyon": false, + | "Deserialized": true, + | "Replication": 1 + | }, + | "Number of Partitions": 800, + | "Number of Cached Partitions": 1200, + | "Memory Size": 1600, + | "Tachyon Size": 0, + | "Disk Size": 2000 + | }, + | { + | "RDD ID": 5, + | "Name": "mayor", + | "Storage Level": { + | "Use Disk": true, + | "Use Memory": true, + | "Use Tachyon": false, + | "Deserialized": true, + | "Replication": 1 + | }, + | "Number of Partitions": 801, + | "Number of Cached Partitions": 1201, + | "Memory Size": 1601, + | "Tachyon Size": 0, + | "Disk Size": 2001 + | }, + | { + | "RDD ID": 6, + | "Name": "mayor", + | "Storage Level": { + | "Use Disk": true, + | "Use Memory": true, + | "Use Tachyon": false, + | "Deserialized": true, + | "Replication": 1 + | }, + | "Number of Partitions": 802, + | "Number of Cached Partitions": 1202, + | "Memory Size": 1602, + | "Tachyon Size": 0, + | "Disk Size": 2002 + | }, + | { + | "RDD ID": 7, + | "Name": "mayor", + | "Storage Level": { + | "Use Disk": true, + | "Use Memory": true, + | "Use Tachyon": false, + | "Deserialized": true, + | "Replication": 1 + | }, + | "Number of Partitions": 803, + | "Number of Cached Partitions": 1203, + | "Memory Size": 1603, + | "Tachyon Size": 0, + | "Disk Size": 2003 + | } + | ], + | "Details": "details", + | "Accumulables": [ + | { + | "ID": 2, + | "Name": " Accumulable 2", + | "Update": "delta2", + | "Value": "val2" + | }, + | { + | "ID": 1, + | "Name": " Accumulable 1", + | "Update": "delta1", + | "Value": "val1" + | } + | ] + | } + | ], | "Stage IDs": [ | 1, | 2, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
