Repository: spark
Updated Branches:
  refs/heads/master d67c82e4b -> 2bfed1a0c


[SPARK-15658][SQL] UDT serializer should declare its data type as udt instead 
of udt.sqlType

## What changes were proposed in this pull request?

When we build serializer for UDT object, we should declare its data type as udt 
instead of udt.sqlType, or if we deserialize it again, we lose the information 
that it's a udt object and throw analysis exception.

## How was this patch tested?

new test in `UserDefiendTypeSuite`

Author: Wenchen Fan <[email protected]>

Closes #13402 from cloud-fan/udt.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2bfed1a0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2bfed1a0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2bfed1a0

Branch: refs/heads/master
Commit: 2bfed1a0c5be7d0718fd574a4dad90f4f6b44be7
Parents: d67c82e
Author: Wenchen Fan <[email protected]>
Authored: Tue May 31 11:00:38 2016 -0700
Committer: Yin Huai <[email protected]>
Committed: Tue May 31 11:00:38 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/catalyst/ScalaReflection.scala    | 4 ++--
 .../org/apache/spark/sql/catalyst/encoders/RowEncoder.scala      | 2 +-
 .../test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala   | 4 ++++
 3 files changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2bfed1a0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index bdd40f3..052cc48 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -568,7 +568,7 @@ object ScalaReflection extends ScalaReflection {
             udt.userClass.getAnnotation(classOf[SQLUserDefinedType]).udt(),
             Nil,
             dataType = 
ObjectType(udt.userClass.getAnnotation(classOf[SQLUserDefinedType]).udt()))
-          Invoke(obj, "serialize", udt.sqlType, inputObject :: Nil)
+          Invoke(obj, "serialize", udt, inputObject :: Nil)
 
         case t if UDTRegistration.exists(getClassNameFromType(t)) =>
           val udt = 
UDTRegistration.getUDTFor(getClassNameFromType(t)).get.newInstance()
@@ -577,7 +577,7 @@ object ScalaReflection extends ScalaReflection {
             udt.getClass,
             Nil,
             dataType = ObjectType(udt.getClass))
-          Invoke(obj, "serialize", udt.sqlType, inputObject :: Nil)
+          Invoke(obj, "serialize", udt, inputObject :: Nil)
 
         case t if definedByConstructorParams(t) =>
           val params = getConstructorParameters(t)

http://git-wip-us.apache.org/repos/asf/spark/blob/2bfed1a0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
index 2f8ba33..0de9166 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
@@ -88,7 +88,7 @@ object RowEncoder {
         udtClass,
         Nil,
         dataType = ObjectType(udtClass), false)
-      Invoke(obj, "serialize", udt.sqlType, inputObject :: Nil)
+      Invoke(obj, "serialize", udt, inputObject :: Nil)
 
     case TimestampType =>
       StaticInvoke(

http://git-wip-us.apache.org/repos/asf/spark/blob/2bfed1a0/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
index 7d7b486..474f17f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
@@ -188,6 +188,10 @@ class UserDefinedTypeSuite extends QueryTest with 
SharedSQLContext with ParquetT
 
     val toCatalystConverter = 
CatalystTypeConverters.createToCatalystConverter(udt)
     assert(toCatalystConverter(null) === null)
+  }
 
+  test("SPARK-15658: Analysis exception if Dataset.map returns UDT object") {
+    // call `collect` to make sure this query can pass analysis.
+    pointsRDD.as[MyLabeledPoint].map(_.copy(label = 2.0)).collect()
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to