This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 077b27ceb34b [SPARK-52671][SQL] RowEncoder shall not lookup a resolved 
UDT
077b27ceb34b is described below

commit 077b27ceb34bff3f7fefdbb18f590385ec353e0a
Author: Kent Yao <y...@apache.org>
AuthorDate: Fri Jul 4 19:21:56 2025 -0700

    [SPARK-52671][SQL] RowEncoder shall not lookup a resolved UDT
    
    ### What changes were proposed in this pull request?
    
    This PR fixes a bug shown below
    ```sql
    spark-sql (default)> select submissionTime from bbb;
    xxx
    spark-sql (default)> cache table a as select submissionTime from bbb;
    org.apache.spark.SparkException: xxx is not annotated with 
SQLUserDefinedType nor registered with UDTRegistration.}
            at 
org.apache.spark.sql.errors.ExecutionErrors.userDefinedTypeNotAnnotatedAndRegisteredError(ExecutionErrors.scala:167)
            at 
org.apache.spark.sql.errors.ExecutionErrors.userDefinedTypeNotAnnotatedAndRegisteredError$(ExecutionErrors.scala:163)
            at 
org.apache.spark.sql.errors.ExecutionErrors$.userDefinedTypeNotAnnotatedAndRegisteredError(ExecutionErrors.scala:259)
            at 
org.apache.spark.sql.catalyst.encoders.RowEncoder$.$anonfun$encoderForDataType$1(RowEncoder.scala:108)
            at scala.Option.getOrElse(Option.scala:201)
            at 
org.apache.spark.sql.catalyst.encoders.RowEncoder$.encoderForDataType(RowEncoder.scala:108)
            at 
org.apache.spark.sql.catalyst.encoders.RowEncoder$.$anonfun$encoderForDataType$2(RowEncoder.scala:128)
            at scala.collection.ArrayOps$.map$extension(ArrayOps.scala:936)
            at 
org.apache.spark.sql.catalyst.encoders.RowEncoder$.encoderForDataType(RowEncoder.scala:125)
            at 
org.apache.spark.sql.catalyst.encoders.RowEncoder$.encoderFor(RowEncoder.scala:69)
            at 
org.apache.spark.sql.catalyst.encoders.RowEncoder$.encoderFor(RowEncoder.scala:65)
            at org.apache.spark.sql.classic.Dataset.toDF(Dataset.scala:519)
            at org.apache.spark.sql.classic.Dataset.groupBy(Dataset.scala:941)
            at org.apache.spark.sql.classic.Dataset.count(Dataset.scala:1501)
    ```
    
    The RowEncoder tried to relookup a UDT which is already resolved
    
    ### Why are the changes needed?
    bugfix
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    
    modified tests in 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #51360 from yaooqinn/SPARK-52671.
    
    Authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../src/main/resources/error/error-conditions.json |  5 -----
 .../spark/sql/catalyst/encoders/RowEncoder.scala   | 13 ++----------
 .../apache/spark/sql/errors/ExecutionErrors.scala  |  9 +--------
 .../sql/catalyst/encoders/RowEncoderSuite.scala    | 23 +++++++++++++++++++++-
 4 files changed, 25 insertions(+), 25 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 000b1f524f20..cb7e2faaf2be 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -8345,11 +8345,6 @@
       "Failed to get outer pointer for <innerCls>."
     ]
   },
-  "_LEGACY_ERROR_TEMP_2155" : {
-    "message" : [
-      "<userClass> is not annotated with SQLUserDefinedType nor registered 
with UDTRegistration.}"
-    ]
-  },
   "_LEGACY_ERROR_TEMP_2163" : {
     "message" : [
       "Initial type <dataType> must be a <target>."
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
index d5692bb85c4e..620278c66d21 100644
--- 
a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala
@@ -22,7 +22,7 @@ import scala.reflect.classTag
 
 import org.apache.spark.sql.{AnalysisException, Row}
 import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, 
BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, 
BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, CalendarIntervalEncoder, 
CharEncoder, DateEncoder, DayTimeIntervalEncoder, EncoderField, InstantEncoder, 
IterableEncoder, JavaDecimalEncoder, LocalDateEncoder, LocalDateTimeEncoder, 
LocalTimeEncoder, MapEncoder, NullEncoder, RowEncoder => AgnosticRowEncoder, 
StringEncoder, Timesta [...]
-import org.apache.spark.sql.errors.{DataTypeErrorsBase, ExecutionErrors}
+import org.apache.spark.sql.errors.DataTypeErrorsBase
 import org.apache.spark.sql.internal.SqlApiConf
 import org.apache.spark.sql.types._
 import org.apache.spark.util.ArrayImplicits._
@@ -99,16 +99,7 @@ object RowEncoder extends DataTypeErrorsBase {
       case p: PythonUserDefinedType =>
         // TODO check if this works.
         encoderForDataType(p.sqlType, lenient)
-      case udt: UserDefinedType[_] =>
-        val annotation = 
udt.userClass.getAnnotation(classOf[SQLUserDefinedType])
-        val udtClass: Class[_] = if (annotation != null) {
-          annotation.udt()
-        } else {
-          UDTRegistration.getUDTFor(udt.userClass.getName).getOrElse {
-            throw 
ExecutionErrors.userDefinedTypeNotAnnotatedAndRegisteredError(udt)
-          }
-        }
-        UDTEncoder(udt, udtClass.asInstanceOf[Class[_ <: UserDefinedType[_]]])
+      case udt: UserDefinedType[_] => UDTEncoder(udt, udt.getClass)
       case ArrayType(elementType, containsNull) =>
         IterableEncoder(
           classTag[mutable.ArraySeq[_]],
diff --git 
a/sql/api/src/main/scala/org/apache/spark/sql/errors/ExecutionErrors.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/errors/ExecutionErrors.scala
index 8124b1a4ab19..1a4369b172f8 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/errors/ExecutionErrors.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/errors/ExecutionErrors.scala
@@ -24,7 +24,7 @@ import org.apache.arrow.vector.types.pojo.ArrowType
 import org.apache.spark.{QueryContext, SparkArithmeticException, 
SparkBuildInfo, SparkDateTimeException, SparkException, SparkRuntimeException, 
SparkUnsupportedOperationException, SparkUpgradeException}
 import org.apache.spark.sql.catalyst.WalkedTypePath
 import org.apache.spark.sql.internal.SqlApiConf
-import org.apache.spark.sql.types.{DataType, DoubleType, StringType, 
UserDefinedType}
+import org.apache.spark.sql.types.{DataType, DoubleType, StringType}
 import org.apache.spark.unsafe.types.UTF8String
 
 private[sql] trait ExecutionErrors extends DataTypeErrorsBase {
@@ -160,13 +160,6 @@ private[sql] trait ExecutionErrors extends 
DataTypeErrorsBase {
       messageParameters = Map("typeName" -> toSQLType(typeName)))
   }
 
-  def userDefinedTypeNotAnnotatedAndRegisteredError(udt: UserDefinedType[_]): 
Throwable = {
-    new SparkException(
-      errorClass = "_LEGACY_ERROR_TEMP_2155",
-      messageParameters = Map("userClass" -> udt.userClass.getName),
-      cause = null)
-  }
-
   def cannotFindEncoderForTypeError(typeName: String): 
SparkUnsupportedOperationException = {
     new SparkUnsupportedOperationException(
       errorClass = "ENCODER_NOT_FOUND",
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
index 05ccaf2cbda0..008aa976d605 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala
@@ -75,6 +75,26 @@ class ExamplePointUDT extends UserDefinedType[ExamplePoint] {
   private[spark] override def asNullable: ExamplePointUDT = this
 }
 
+class ExamplePointNotAnnotated(val x: Double, val y: Double) extends 
Serializable {
+  private val inner = new ExamplePoint(x, y)
+  override def hashCode: Int = inner.hashCode
+  override def equals(that: Any): Boolean = {
+    that match {
+      case e: ExamplePointNotAnnotated => inner.equals(e.inner)
+      case _ => false
+    }
+  }
+}
+class ExamplePointNotAnnotatedUDT extends 
UserDefinedType[ExamplePointNotAnnotated] {
+  override def sqlType: DataType = DoubleType
+  override def serialize(p: ExamplePointNotAnnotated): Double = p.x
+  override def deserialize(datum: Any): ExamplePointNotAnnotated = {
+    val x = datum.asInstanceOf[Double]
+    new ExamplePointNotAnnotated(x, 3.14 * datum.asInstanceOf[Double])
+  }
+  override def userClass: Class[ExamplePointNotAnnotated] = 
classOf[ExamplePointNotAnnotated]
+}
+
 class RowEncoderSuite extends CodegenInterpretedPlanTest {
 
   private val structOfString = new StructType().add("str", StringType)
@@ -111,7 +131,8 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest {
       .add("binary", BinaryType)
       .add("date", DateType)
       .add("timestamp", TimestampType)
-      .add("udt", new ExamplePointUDT))
+      .add("udt", new ExamplePointUDT)
+      .add("udtNotAnnotated", new ExamplePointNotAnnotatedUDT))
 
   encodeDecodeTest(
     new StructType()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to