Repository: spark Updated Branches: refs/heads/branch-2.0 6347ff512 -> 29b94fdb3
[SPARK-15658][SQL] UDT serializer should declare its data type as udt instead of udt.sqlType ## What changes were proposed in this pull request? When we build serializer for UDT object, we should declare its data type as udt instead of udt.sqlType, or if we deserialize it again, we lose the information that it's a udt object and throw analysis exception. ## How was this patch tested? new test in `UserDefiendTypeSuite` Author: Wenchen Fan <[email protected]> Closes #13402 from cloud-fan/udt. (cherry picked from commit 2bfed1a0c5be7d0718fd574a4dad90f4f6b44be7) Signed-off-by: Yin Huai <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/29b94fdb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/29b94fdb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/29b94fdb Branch: refs/heads/branch-2.0 Commit: 29b94fdb380e78ea173585db8bd63def92c4684c Parents: 6347ff5 Author: Wenchen Fan <[email protected]> Authored: Tue May 31 11:00:38 2016 -0700 Committer: Yin Huai <[email protected]> Committed: Tue May 31 11:01:14 2016 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/sql/catalyst/ScalaReflection.scala | 4 ++-- .../org/apache/spark/sql/catalyst/encoders/RowEncoder.scala | 2 +- .../test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala | 4 ++++ 3 files changed, 7 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/29b94fdb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala index bdd40f3..052cc48 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala @@ -568,7 +568,7 @@ object ScalaReflection extends ScalaReflection { udt.userClass.getAnnotation(classOf[SQLUserDefinedType]).udt(), Nil, dataType = ObjectType(udt.userClass.getAnnotation(classOf[SQLUserDefinedType]).udt())) - Invoke(obj, "serialize", udt.sqlType, inputObject :: Nil) + Invoke(obj, "serialize", udt, inputObject :: Nil) case t if UDTRegistration.exists(getClassNameFromType(t)) => val udt = UDTRegistration.getUDTFor(getClassNameFromType(t)).get.newInstance() @@ -577,7 +577,7 @@ object ScalaReflection extends ScalaReflection { udt.getClass, Nil, dataType = ObjectType(udt.getClass)) - Invoke(obj, "serialize", udt.sqlType, inputObject :: Nil) + Invoke(obj, "serialize", udt, inputObject :: Nil) case t if definedByConstructorParams(t) => val params = getConstructorParameters(t) http://git-wip-us.apache.org/repos/asf/spark/blob/29b94fdb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala index 2f8ba33..0de9166 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala @@ -88,7 +88,7 @@ object RowEncoder { udtClass, Nil, dataType = ObjectType(udtClass), false) - Invoke(obj, "serialize", udt.sqlType, inputObject :: Nil) + Invoke(obj, "serialize", udt, inputObject :: Nil) case TimestampType => StaticInvoke( http://git-wip-us.apache.org/repos/asf/spark/blob/29b94fdb/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala index 7d7b486..474f17f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala @@ -188,6 +188,10 @@ class UserDefinedTypeSuite extends QueryTest with SharedSQLContext with ParquetT val toCatalystConverter = CatalystTypeConverters.createToCatalystConverter(udt) assert(toCatalystConverter(null) === null) + } + test("SPARK-15658: Analysis exception if Dataset.map returns UDT object") { + // call `collect` to make sure this query can pass analysis. + pointsRDD.as[MyLabeledPoint].map(_.copy(label = 2.0)).collect() } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
