This is an automated email from the ASF dual-hosted git repository.
joshrosen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a39fc8773b2 [SPARK-39636][CORE][UI] Fix multiple bugs in JsonProtocol,
impacting off heap StorageLevels and Task/Executor ResourceRequests
a39fc8773b2 is described below
commit a39fc8773b2a4e9c58a1e5d0010e0c8396784c37
Author: Josh Rosen <[email protected]>
AuthorDate: Thu Jun 30 13:41:24 2022 -0700
[SPARK-39636][CORE][UI] Fix multiple bugs in JsonProtocol, impacting off
heap StorageLevels and Task/Executor ResourceRequests
### What changes were proposed in this pull request?
This PR fixes three longstanding bugs in Spark's `JsonProtocol`:
- `TaskResourceRequest` loses precision for `amount` < 0.5. The `amount` is
a floating point number which is either between 0 and 0.5 or is a positive
integer, but the JSON read path assumes it is an integer.
- `ExecutorResourceRequest` integer overflows for values larger than
Int.MaxValue because the write path writes longs but the read path assumes
integers.
- Off heap StorageLevels are not handled properly: the `useOffHeap` field
isn't included in the JSON, so this StorageLevel cannot be round-tripped
through JSON. This could cause the History Server to display inaccurate "off
heap memory used" stats on the executors page.
I discovered these bugs while working on #36885.
### Why are the changes needed?
JsonProtocol should be able to roundtrip events through JSON without loss
of information.
### Does this PR introduce _any_ user-facing change?
Yes: it fixes bugs that impact information shown in the History Server Web
UI. The new StorageLevel JSON field will be visible to tools which process raw
event log JSON.
### How was this patch tested?
Updated existing unit tests to cover the changed logic.
Closes #37027 from JoshRosen/jsonprotocol-bugfixes.
Authored-by: Josh Rosen <[email protected]>
Signed-off-by: Josh Rosen <[email protected]>
---
.../scala/org/apache/spark/util/JsonProtocol.scala | 17 +++++--
.../org/apache/spark/util/JsonProtocolSuite.scala | 53 ++++++++++++++++++++--
2 files changed, 64 insertions(+), 6 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 0c15b13d5a1..f0755b04bef 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -512,6 +512,7 @@ private[spark] object JsonProtocol {
def storageLevelToJson(storageLevel: StorageLevel): JValue = {
("Use Disk" -> storageLevel.useDisk) ~
("Use Memory" -> storageLevel.useMemory) ~
+ ("Use Off Heap" -> storageLevel.useOffHeap) ~
("Deserialized" -> storageLevel.deserialized) ~
("Replication" -> storageLevel.replication)
}
@@ -750,7 +751,7 @@ private[spark] object JsonProtocol {
def executorResourceRequestFromJson(json: JValue): ExecutorResourceRequest =
{
val rName = (json \ "Resource Name").extract[String]
- val amount = (json \ "Amount").extract[Int]
+ val amount = (json \ "Amount").extract[Long]
val discoveryScript = (json \ "Discovery Script").extract[String]
val vendor = (json \ "Vendor").extract[String]
new ExecutorResourceRequest(rName, amount, discoveryScript, vendor)
@@ -758,7 +759,7 @@ private[spark] object JsonProtocol {
def taskResourceRequestFromJson(json: JValue): TaskResourceRequest = {
val rName = (json \ "Resource Name").extract[String]
- val amount = (json \ "Amount").extract[Int]
+ val amount = (json \ "Amount").extract[Double]
new TaskResourceRequest(rName, amount)
}
@@ -1202,9 +1203,19 @@ private[spark] object JsonProtocol {
def storageLevelFromJson(json: JValue): StorageLevel = {
val useDisk = (json \ "Use Disk").extract[Boolean]
val useMemory = (json \ "Use Memory").extract[Boolean]
+ // The "Use Off Heap" field was added in Spark 3.4.0
+ val useOffHeap = jsonOption(json \ "Use Off Heap") match {
+ case Some(value) => value.extract[Boolean]
+ case None => false
+ }
val deserialized = (json \ "Deserialized").extract[Boolean]
val replication = (json \ "Replication").extract[Int]
- StorageLevel(useDisk, useMemory, deserialized, replication)
+ StorageLevel(
+ useDisk = useDisk,
+ useMemory = useMemory,
+ useOffHeap = useOffHeap,
+ deserialized = deserialized,
+ replication = replication)
}
def blockStatusFromJson(json: JValue): BlockStatus = {
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index ea6267698c8..7a18223ec5b 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -136,9 +136,14 @@ class JsonProtocolSuite extends SparkFunSuite {
321L, 654L, 765L, 256912L, 123456L, 123456L, 61728L,
30364L, 15182L, 10L, 90L, 2L, 20L, 80001L)))
val rprofBuilder = new ResourceProfileBuilder()
- val taskReq = new TaskResourceRequests().cpus(1).resource("gpu", 1)
- val execReq =
- new ExecutorResourceRequests().cores(2).resource("gpu", 2, "myscript")
+ val taskReq = new TaskResourceRequests()
+ .cpus(1)
+ .resource("gpu", 1)
+ .resource("fgpa", 0.5)
+ val execReq: ExecutorResourceRequests = new ExecutorResourceRequests()
+ .cores(2)
+ .resource("gpu", 2, "myscript")
+ .resource("myCustomResource", amount = Int.MaxValue + 1L,
discoveryScript = "myscript2")
rprofBuilder.require(taskReq).require(execReq)
val resourceProfile = rprofBuilder.build
resourceProfile.setResourceProfileId(21)
@@ -203,6 +208,7 @@ class JsonProtocolSuite extends SparkFunSuite {
testStorageLevel(StorageLevel.MEMORY_AND_DISK_2)
testStorageLevel(StorageLevel.MEMORY_AND_DISK_SER)
testStorageLevel(StorageLevel.MEMORY_AND_DISK_SER_2)
+ testStorageLevel(StorageLevel.OFF_HEAP)
// JobResult
val exception = new Exception("Out of Memory! Please restock film.")
@@ -319,6 +325,21 @@ class JsonProtocolSuite extends SparkFunSuite {
val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
}
+ test("StorageLevel backward compatibility") {
+ // "Use Off Heap" was added in Spark 3.4.0
+ val level = StorageLevel(
+ useDisk = false,
+ useMemory = true,
+ useOffHeap = true,
+ deserialized = false,
+ replication = 1
+ )
+ val newJson = JsonProtocol.storageLevelToJson(level)
+ val oldJson = newJson.removeField { case (field, _) => field == "Use Off
Heap" }
+ val newLevel = JsonProtocol.storageLevelFromJson(oldJson)
+ assert(newLevel.useOffHeap === false)
+ }
+
test("BlockManager events backward compatibility") {
// SparkListenerBlockManagerAdded/Removed in Spark 1.0.0 do not have a
"time" property.
val blockManagerAdded = SparkListenerBlockManagerAdded(1L,
@@ -1189,6 +1210,7 @@ private[spark] object JsonProtocolSuite extends
Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
+ | "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
@@ -1437,6 +1459,7 @@ private[spark] object JsonProtocolSuite extends
Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
+ | "Use Off Heap": false,
| "Deserialized": false,
| "Replication": 2
| },
@@ -1563,6 +1586,7 @@ private[spark] object JsonProtocolSuite extends
Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
+ | "Use Off Heap": false,
| "Deserialized": false,
| "Replication": 2
| },
@@ -1689,6 +1713,7 @@ private[spark] object JsonProtocolSuite extends
Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
+ | "Use Off Heap": false,
| "Deserialized": false,
| "Replication": 2
| },
@@ -1722,6 +1747,7 @@ private[spark] object JsonProtocolSuite extends
Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
+ | "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
@@ -1769,6 +1795,7 @@ private[spark] object JsonProtocolSuite extends
Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
+ | "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
@@ -1787,6 +1814,7 @@ private[spark] object JsonProtocolSuite extends
Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
+ | "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
@@ -1834,6 +1862,7 @@ private[spark] object JsonProtocolSuite extends
Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
+ | "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
@@ -1852,6 +1881,7 @@ private[spark] object JsonProtocolSuite extends
Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
+ | "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
@@ -1870,6 +1900,7 @@ private[spark] object JsonProtocolSuite extends
Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
+ | "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
@@ -1917,6 +1948,7 @@ private[spark] object JsonProtocolSuite extends
Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
+ | "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
@@ -1935,6 +1967,7 @@ private[spark] object JsonProtocolSuite extends
Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
+ | "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
@@ -1953,6 +1986,7 @@ private[spark] object JsonProtocolSuite extends
Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
+ | "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
@@ -1971,6 +2005,7 @@ private[spark] object JsonProtocolSuite extends
Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
+ | "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
@@ -2291,6 +2326,7 @@ private[spark] object JsonProtocolSuite extends
Assertions {
| "Storage Level": {
| "Use Disk": true,
| "Use Memory": true,
+ | "Use Off Heap": false,
| "Deserialized": false,
| "Replication": 2
| },
@@ -2489,6 +2525,7 @@ private[spark] object JsonProtocolSuite extends
Assertions {
| "Storage Level": {
| "Use Disk": false,
| "Use Memory": true,
+ | "Use Off Heap": false,
| "Deserialized": true,
| "Replication": 1
| },
@@ -2578,6 +2615,12 @@ private[spark] object JsonProtocolSuite extends
Assertions {
| "Discovery Script":"",
| "Vendor":""
| },
+ | "myCustomResource":{
+ | "Resource Name":"myCustomResource",
+ | "Amount": 2147483648,
+ | "Discovery Script": "myscript2",
+ | "Vendor" : ""
+ | },
| "gpu":{
| "Resource Name":"gpu",
| "Amount":2,
@@ -2593,6 +2636,10 @@ private[spark] object JsonProtocolSuite extends
Assertions {
| "gpu":{
| "Resource Name":"gpu",
| "Amount":1.0
+ | },
+ | "fgpa":{
+ | "Resource Name":"fgpa",
+ | "Amount":0.5
| }
| }
|}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]