Repository: spark Updated Branches: refs/heads/branch-1.5 63ca9f921 -> 025e16b32
[SPARK-11047] Internal accumulators miss the internal flag when replaying events in the history server Internal accumulators don't write the internal flag to event log. So on the history server Web UI, all accumulators are not internal. This causes incorrect peak execution memory and unwanted accumulator table displayed on the stage page. To fix it, I add the "internal" property of AccumulableInfo when writing the event log. Author: Carson Wang <[email protected]> Closes #9061 from carsonwang/accumulableBug. (cherry picked from commit d45a0d3ca23df86cf0a95508ccc3b4b98f1b611c) Signed-off-by: Reynold Xin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/025e16b3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/025e16b3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/025e16b3 Branch: refs/heads/branch-1.5 Commit: 025e16b32aad60d2ccfbcdd64590b5eef85c4f0a Parents: 63ca9f9 Author: Carson Wang <[email protected]> Authored: Thu Oct 15 10:36:54 2015 -0700 Committer: Reynold Xin <[email protected]> Committed: Thu Oct 15 10:37:04 2015 -0700 ---------------------------------------------------------------------- .../spark/scheduler/AccumulableInfo.scala | 9 ++ .../org/apache/spark/util/JsonProtocol.scala | 6 +- .../apache/spark/util/JsonProtocolSuite.scala | 96 ++++++++++++++------ 3 files changed, 79 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/025e16b3/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala index 11d123e..59d123f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala @@ -40,6 +40,15 @@ class AccumulableInfo private[spark] ( } object AccumulableInfo { + def apply( + id: Long, + name: String, + update: Option[String], + value: String, + internal: Boolean): AccumulableInfo = { + new AccumulableInfo(id, name, update, value, internal) + } + def apply(id: Long, name: String, update: Option[String], value: String): AccumulableInfo = { new AccumulableInfo(id, name, update, value, internal = false) } http://git-wip-us.apache.org/repos/asf/spark/blob/025e16b3/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 311bb59..5cefbce 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -282,7 +282,8 @@ private[spark] object JsonProtocol { ("ID" -> accumulableInfo.id) ~ ("Name" -> accumulableInfo.name) ~ ("Update" -> accumulableInfo.update.map(new JString(_)).getOrElse(JNothing)) ~ - ("Value" -> accumulableInfo.value) + ("Value" -> accumulableInfo.value) ~ + ("Internal" -> accumulableInfo.internal) } def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = { @@ -695,7 +696,8 @@ private[spark] object JsonProtocol { val name = (json \ "Name").extract[String] val update = Utils.jsonOption(json \ "Update").map(_.extract[String]) val value = (json \ "Value").extract[String] - AccumulableInfo(id, name, update, value) + val internal = (json \ "Internal").extractOpt[Boolean].getOrElse(false) + AccumulableInfo(id, name, update, value, internal) } def taskMetricsFromJson(json: JValue): TaskMetrics = { http://git-wip-us.apache.org/repos/asf/spark/blob/025e16b3/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 25336bb..1be4827 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -364,6 +364,15 @@ class JsonProtocolSuite extends SparkFunSuite { assertEquals(expectedDenied, JsonProtocol.taskEndReasonFromJson(oldDenied)) } + test("AccumulableInfo backward compatibility") { + // "Internal" property of AccumulableInfo were added after 1.5.1. + val accumulableInfo = makeAccumulableInfo(1) + val oldJson = JsonProtocol.accumulableInfoToJson(accumulableInfo) + .removeField({ _._1 == "Internal" }) + val oldInfo = JsonProtocol.accumulableInfoFromJson(oldJson) + assert(false === oldInfo.internal) + } + /** -------------------------- * | Helper test running methods | * --------------------------- */ @@ -721,15 +730,15 @@ class JsonProtocolSuite extends SparkFunSuite { val taskInfo = new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL, speculative) val (acc1, acc2, acc3) = - (makeAccumulableInfo(1), makeAccumulableInfo(2), makeAccumulableInfo(3)) + (makeAccumulableInfo(1), makeAccumulableInfo(2), makeAccumulableInfo(3, internal = true)) taskInfo.accumulables += acc1 taskInfo.accumulables += acc2 taskInfo.accumulables += acc3 taskInfo } - private def makeAccumulableInfo(id: Int): AccumulableInfo = - AccumulableInfo(id, " Accumulable " + id, Some("delta" + id), "val" + id) + private def makeAccumulableInfo(id: Int, internal: Boolean = false): AccumulableInfo = + AccumulableInfo(id, " Accumulable " + id, Some("delta" + id), "val" + id, internal) /** * Creates a TaskMetrics object describing a task that read data from Hadoop (if hasHadoopInput is @@ -810,13 +819,15 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 2, | "Name": "Accumulable2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 1, | "Name": "Accumulable1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | } | ] | }, @@ -864,13 +875,15 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 2, | "Name": "Accumulable2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 1, | "Name": "Accumulable1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | } | ] | } @@ -900,19 +913,22 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 1, | "Name": "Accumulable1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | }, | { | "ID": 2, | "Name": "Accumulable2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 3, | "Name": "Accumulable3", | "Update": "delta3", - | "Value": "val3" + | "Value": "val3", + | "Internal": true | } | ] | } @@ -940,19 +956,22 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 1, | "Name": "Accumulable1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | }, | { | "ID": 2, | "Name": "Accumulable2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 3, | "Name": "Accumulable3", | "Update": "delta3", - | "Value": "val3" + | "Value": "val3", + | "Internal": true | } | ] | } @@ -986,19 +1005,22 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 1, | "Name": "Accumulable1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | }, | { | "ID": 2, | "Name": "Accumulable2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 3, | "Name": "Accumulable3", | "Update": "delta3", - | "Value": "val3" + | "Value": "val3", + | "Internal": true | } | ] | }, @@ -1072,19 +1094,22 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 1, | "Name": "Accumulable1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | }, | { | "ID": 2, | "Name": "Accumulable2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 3, | "Name": "Accumulable3", | "Update": "delta3", - | "Value": "val3" + | "Value": "val3", + | "Internal": true | } | ] | }, @@ -1155,19 +1180,22 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 1, | "Name": "Accumulable1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | }, | { | "ID": 2, | "Name": "Accumulable2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 3, | "Name": "Accumulable3", | "Update": "delta3", - | "Value": "val3" + | "Value": "val3", + | "Internal": true | } | ] | }, @@ -1249,13 +1277,15 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 2, | "Name": " Accumulable 2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 1, | "Name": " Accumulable 1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | } | ] | }, @@ -1307,13 +1337,15 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 2, | "Name": " Accumulable 2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 1, | "Name": " Accumulable 1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | } | ] | }, @@ -1382,13 +1414,15 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 2, | "Name": " Accumulable 2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 1, | "Name": " Accumulable 1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | } | ] | }, @@ -1474,13 +1508,15 @@ class JsonProtocolSuite extends SparkFunSuite { | "ID": 2, | "Name": " Accumulable 2", | "Update": "delta2", - | "Value": "val2" + | "Value": "val2", + | "Internal": false | }, | { | "ID": 1, | "Name": " Accumulable 1", | "Update": "delta1", - | "Value": "val1" + | "Value": "val1", + | "Internal": false | } | ] | } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
