Repository: spark Updated Branches: refs/heads/master ce13c2672 -> d9dd979d1
[SPARK-18462] Fix ClassCastException in SparkListenerDriverAccumUpdates event ## What changes were proposed in this pull request? This patch fixes a `ClassCastException: java.lang.Integer cannot be cast to java.lang.Long` error which could occur in the HistoryServer while trying to process a deserialized `SparkListenerDriverAccumUpdates` event. The problem stems from how `jackson-module-scala` handles primitive type parameters (see https://github.com/FasterXML/jackson-module-scala/wiki/FAQ#deserializing-optionint-and-other-primitive-challenges for more details). This was causing a problem where our code expected a field to be deserialized as a `(Long, Long)` tuple but we got an `(Int, Int)` tuple instead. This patch hacks around this issue by registering a custom `Converter` with Jackson in order to deserialize the tuples as `(Object, Object)` and perform the appropriate casting. ## How was this patch tested? New regression tests in `SQLListenerSuite`. Author: Josh Rosen <joshro...@databricks.com> Closes #15922 from JoshRosen/SPARK-18462. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d9dd979d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d9dd979d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d9dd979d Branch: refs/heads/master Commit: d9dd979d170f44383a9a87f892f2486ddb3cca7d Parents: ce13c26 Author: Josh Rosen <joshro...@databricks.com> Authored: Thu Nov 17 18:45:15 2016 -0800 Committer: Reynold Xin <r...@databricks.com> Committed: Thu Nov 17 18:45:15 2016 -0800 ---------------------------------------------------------------------- .../spark/sql/execution/ui/SQLListener.scala | 39 ++++++++++++++++- .../sql/execution/ui/SQLListenerSuite.scala | 44 +++++++++++++++++++- 2 files changed, 80 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d9dd979d/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 60f1343..5daf215 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -19,6 +19,11 @@ package org.apache.spark.sql.execution.ui import scala.collection.mutable +import com.fasterxml.jackson.databind.JavaType +import com.fasterxml.jackson.databind.`type`.TypeFactory +import com.fasterxml.jackson.databind.annotation.JsonDeserialize +import com.fasterxml.jackson.databind.util.Converter + import org.apache.spark.{JobExecutionStatus, SparkConf} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging @@ -43,9 +48,41 @@ case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long) extends SparkListenerEvent @DeveloperApi -case class SparkListenerDriverAccumUpdates(executionId: Long, accumUpdates: Seq[(Long, Long)]) +case class SparkListenerDriverAccumUpdates( + executionId: Long, + @JsonDeserialize(contentConverter = classOf[LongLongTupleConverter]) + accumUpdates: Seq[(Long, Long)]) extends SparkListenerEvent +/** + * Jackson [[Converter]] for converting an (Int, Int) tuple into a (Long, Long) tuple. + * + * This is necessary due to limitations in how Jackson's scala module deserializes primitives; + * see the "Deserializing Option[Int] and other primitive challenges" section in + * https://github.com/FasterXML/jackson-module-scala/wiki/FAQ for a discussion of this issue and + * SPARK-18462 for the specific problem that motivated this conversion. + */ +private class LongLongTupleConverter extends Converter[(Object, Object), (Long, Long)] { + + override def convert(in: (Object, Object)): (Long, Long) = { + def toLong(a: Object): Long = a match { + case i: java.lang.Integer => i.intValue() + case l: java.lang.Long => l.longValue() + } + (toLong(in._1), toLong(in._2)) + } + + override def getInputType(typeFactory: TypeFactory): JavaType = { + val objectType = typeFactory.uncheckedSimpleType(classOf[Object]) + typeFactory.constructSimpleType(classOf[(_, _)], classOf[(_, _)], Array(objectType, objectType)) + } + + override def getOutputType(typeFactory: TypeFactory): JavaType = { + val longType = typeFactory.uncheckedSimpleType(classOf[Long]) + typeFactory.constructSimpleType(classOf[(_, _)], classOf[(_, _)], Array(longType, longType)) + } +} + class SQLHistoryListenerFactory extends SparkHistoryListenerFactory { override def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener] = { http://git-wip-us.apache.org/repos/asf/spark/blob/d9dd979d/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala index 948a155..8aea112 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.ui import java.util.Properties +import org.json4s.jackson.JsonMethods._ import org.mockito.Mockito.mock import org.apache.spark._ @@ -35,10 +36,10 @@ import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanIn import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.ui.SparkUI -import org.apache.spark.util.{AccumulatorMetadata, LongAccumulator} +import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator} -class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { +class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils { import testImplicits._ import org.apache.spark.AccumulatorSuite.makeInfo @@ -416,6 +417,45 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext { assert(driverUpdates(physicalPlan.longMetric("dummy").id) == expectedAccumValue) } + test("roundtripping SparkListenerDriverAccumUpdates through JsonProtocol (SPARK-18462)") { + val event = SparkListenerDriverAccumUpdates(1L, Seq((2L, 3L))) + val json = JsonProtocol.sparkEventToJson(event) + assertValidDataInJson(json, + parse(""" + |{ + | "Event": "org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates", + | "executionId": 1, + | "accumUpdates": [[2,3]] + |} + """.stripMargin)) + JsonProtocol.sparkEventFromJson(json) match { + case SparkListenerDriverAccumUpdates(executionId, accums) => + assert(executionId == 1L) + accums.foreach { case (a, b) => + assert(a == 2L) + assert(b == 3L) + } + } + + // Test a case where the numbers in the JSON can only fit in longs: + val longJson = parse( + """ + |{ + | "Event": "org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates", + | "executionId": 4294967294, + | "accumUpdates": [[4294967294,3]] + |} + """.stripMargin) + JsonProtocol.sparkEventFromJson(longJson) match { + case SparkListenerDriverAccumUpdates(executionId, accums) => + assert(executionId == 4294967294L) + accums.foreach { case (a, b) => + assert(a == 4294967294L) + assert(b == 3L) + } + } + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org