This is an automated email from the ASF dual-hosted git repository.
zsxwing pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 48017cc [SPARK-31923][CORE] Ignore internal accumulators that use
unrecognized types rather than crashing (branch-2.4)
48017cc is described below
commit 48017cc36bdf7d84506daeed589e4cbebff269f8
Author: Shixiong Zhu <[email protected]>
AuthorDate: Mon Jun 8 16:52:34 2020 -0700
[SPARK-31923][CORE] Ignore internal accumulators that use unrecognized
types rather than crashing (branch-2.4)
### What changes were proposed in this pull request?
Backport #28744 to branch-2.4.
### Why are the changes needed?
Low risky fix for branch-2.4.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New unit tests.
Closes #28758 from zsxwing/SPARK-31923-2.4.
Authored-by: Shixiong Zhu <[email protected]>
Signed-off-by: Shixiong Zhu <[email protected]>
---
.../scala/org/apache/spark/util/JsonProtocol.scala | 20 ++++++---
.../org/apache/spark/util/JsonProtocolSuite.scala | 47 ++++++++++++++++++++++
2 files changed, 62 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 50c6461..0e613ce 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -326,12 +326,22 @@ private[spark] object JsonProtocol {
case v: Long => JInt(v)
// We only have 3 kind of internal accumulator types, so if it's not
int or long, it must be
// the blocks accumulator, whose type is `java.util.List[(BlockId,
BlockStatus)]`
- case v =>
- JArray(v.asInstanceOf[java.util.List[(BlockId,
BlockStatus)]].asScala.toList.map {
- case (id, status) =>
- ("Block ID" -> id.toString) ~
- ("Status" -> blockStatusToJson(status))
+ case v: java.util.List[_] =>
+ JArray(v.asScala.toList.flatMap {
+ case (id: BlockId, status: BlockStatus) =>
+ Some(
+ ("Block ID" -> id.toString) ~
+ ("Status" -> blockStatusToJson(status))
+ )
+ case _ =>
+ // Ignore unsupported types. A user may put `METRICS_PREFIX` in
the name. We should
+ // not crash.
+ None
})
+ case _ =>
+ // Ignore unsupported types. A user may put `METRICS_PREFIX` in the
name. We should not
+ // crash.
+ JNothing
}
} else {
// For all external accumulators, just use strings
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 74b72d9..40fb2e3 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -436,6 +436,53 @@ class JsonProtocolSuite extends SparkFunSuite {
testAccumValue(Some("anything"), 123, JString("123"))
}
+ /** Create an AccumulableInfo and verify we can serialize and deserialize
it. */
+ private def testAccumulableInfo(
+ name: String,
+ value: Option[Any],
+ expectedValue: Option[Any]): Unit = {
+ val isInternal = name.startsWith(InternalAccumulator.METRICS_PREFIX)
+ val accum = AccumulableInfo(
+ 123L,
+ Some(name),
+ update = value,
+ value = value,
+ internal = isInternal,
+ countFailedValues = false)
+ val json = JsonProtocol.accumulableInfoToJson(accum)
+ val newAccum = JsonProtocol.accumulableInfoFromJson(json)
+ assert(newAccum == accum.copy(update = expectedValue, value =
expectedValue))
+ }
+
+ test("SPARK-31923: unexpected value type of internal accumulator") {
+ // Because a user may use `METRICS_PREFIX` in an accumulator name, we
should test unexpected
+ // types to make sure we don't crash.
+ import InternalAccumulator.METRICS_PREFIX
+ testAccumulableInfo(
+ METRICS_PREFIX + "fooString",
+ value = Some("foo"),
+ expectedValue = None)
+ testAccumulableInfo(
+ METRICS_PREFIX + "fooList",
+ value = Some(java.util.Arrays.asList("string")),
+ expectedValue = Some(java.util.Collections.emptyList())
+ )
+ val blocks = Seq(
+ (TestBlockId("block1"), BlockStatus(StorageLevel.MEMORY_ONLY, 1L, 2L)),
+ (TestBlockId("block2"), BlockStatus(StorageLevel.DISK_ONLY, 3L, 4L)))
+ testAccumulableInfo(
+ METRICS_PREFIX + "fooList",
+ value = Some(java.util.Arrays.asList(
+ "string",
+ blocks(0),
+ blocks(1))),
+ expectedValue = Some(blocks.asJava)
+ )
+ testAccumulableInfo(
+ METRICS_PREFIX + "fooSet",
+ value = Some(Set("foo")),
+ expectedValue = None)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]