Repository: spark
Updated Branches:
  refs/heads/branch-2.0 6347ff512 -> 29b94fdb3


[SPARK-15658][SQL] UDT serializer should declare its data type as udt instead 
of udt.sqlType

## What changes were proposed in this pull request?

When we build serializer for UDT object, we should declare its data type as udt 
instead of udt.sqlType, or if we deserialize it again, we lose the information 
that it's a udt object and throw analysis exception.

## How was this patch tested?

new test in `UserDefiendTypeSuite`

Author: Wenchen Fan <[email protected]>

Closes #13402 from cloud-fan/udt.

(cherry picked from commit 2bfed1a0c5be7d0718fd574a4dad90f4f6b44be7)
Signed-off-by: Yin Huai <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/29b94fdb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/29b94fdb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/29b94fdb

Branch: refs/heads/branch-2.0
Commit: 29b94fdb380e78ea173585db8bd63def92c4684c
Parents: 6347ff5
Author: Wenchen Fan <[email protected]>
Authored: Tue May 31 11:00:38 2016 -0700
Committer: Yin Huai <[email protected]>
Committed: Tue May 31 11:01:14 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/catalyst/ScalaReflection.scala    | 4 ++--
 .../org/apache/spark/sql/catalyst/encoders/RowEncoder.scala      | 2 +-
 .../test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala   | 4 ++++
 3 files changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/29b94fdb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index bdd40f3..052cc48 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -568,7 +568,7 @@ object ScalaReflection extends ScalaReflection {
             udt.userClass.getAnnotation(classOf[SQLUserDefinedType]).udt(),
             Nil,
             dataType = 
ObjectType(udt.userClass.getAnnotation(classOf[SQLUserDefinedType]).udt()))
-          Invoke(obj, "serialize", udt.sqlType, inputObject :: Nil)
+          Invoke(obj, "serialize", udt, inputObject :: Nil)
 
         case t if UDTRegistration.exists(getClassNameFromType(t)) =>
           val udt = 
UDTRegistration.getUDTFor(getClassNameFromType(t)).get.newInstance()
@@ -577,7 +577,7 @@ object ScalaReflection extends ScalaReflection {
             udt.getClass,
             Nil,
             dataType = ObjectType(udt.getClass))
-          Invoke(obj, "serialize", udt.sqlType, inputObject :: Nil)
+          Invoke(obj, "serialize", udt, inputObject :: Nil)
 
         case t if definedByConstructorParams(t) =>
           val params = getConstructorParameters(t)

http://git-wip-us.apache.org/repos/asf/spark/blob/29b94fdb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
index 2f8ba33..0de9166 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
@@ -88,7 +88,7 @@ object RowEncoder {
         udtClass,
         Nil,
         dataType = ObjectType(udtClass), false)
-      Invoke(obj, "serialize", udt.sqlType, inputObject :: Nil)
+      Invoke(obj, "serialize", udt, inputObject :: Nil)
 
     case TimestampType =>
       StaticInvoke(

http://git-wip-us.apache.org/repos/asf/spark/blob/29b94fdb/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
index 7d7b486..474f17f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
@@ -188,6 +188,10 @@ class UserDefinedTypeSuite extends QueryTest with 
SharedSQLContext with ParquetT
 
     val toCatalystConverter = 
CatalystTypeConverters.createToCatalystConverter(udt)
     assert(toCatalystConverter(null) === null)
+  }
 
+  test("SPARK-15658: Analysis exception if Dataset.map returns UDT object") {
+    // call `collect` to make sure this query can pass analysis.
+    pointsRDD.as[MyLabeledPoint].map(_.copy(label = 2.0)).collect()
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to