This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 077b27ceb34b [SPARK-52671][SQL] RowEncoder shall not lookup a resolved UDT 077b27ceb34b is described below commit 077b27ceb34bff3f7fefdbb18f590385ec353e0a Author: Kent Yao <y...@apache.org> AuthorDate: Fri Jul 4 19:21:56 2025 -0700 [SPARK-52671][SQL] RowEncoder shall not lookup a resolved UDT ### What changes were proposed in this pull request? This PR fixes a bug shown below ```sql spark-sql (default)> select submissionTime from bbb; xxx spark-sql (default)> cache table a as select submissionTime from bbb; org.apache.spark.SparkException: xxx is not annotated with SQLUserDefinedType nor registered with UDTRegistration.} at org.apache.spark.sql.errors.ExecutionErrors.userDefinedTypeNotAnnotatedAndRegisteredError(ExecutionErrors.scala:167) at org.apache.spark.sql.errors.ExecutionErrors.userDefinedTypeNotAnnotatedAndRegisteredError$(ExecutionErrors.scala:163) at org.apache.spark.sql.errors.ExecutionErrors$.userDefinedTypeNotAnnotatedAndRegisteredError(ExecutionErrors.scala:259) at org.apache.spark.sql.catalyst.encoders.RowEncoder$.$anonfun$encoderForDataType$1(RowEncoder.scala:108) at scala.Option.getOrElse(Option.scala:201) at org.apache.spark.sql.catalyst.encoders.RowEncoder$.encoderForDataType(RowEncoder.scala:108) at org.apache.spark.sql.catalyst.encoders.RowEncoder$.$anonfun$encoderForDataType$2(RowEncoder.scala:128) at scala.collection.ArrayOps$.map$extension(ArrayOps.scala:936) at org.apache.spark.sql.catalyst.encoders.RowEncoder$.encoderForDataType(RowEncoder.scala:125) at org.apache.spark.sql.catalyst.encoders.RowEncoder$.encoderFor(RowEncoder.scala:69) at org.apache.spark.sql.catalyst.encoders.RowEncoder$.encoderFor(RowEncoder.scala:65) at org.apache.spark.sql.classic.Dataset.toDF(Dataset.scala:519) at org.apache.spark.sql.classic.Dataset.groupBy(Dataset.scala:941) at org.apache.spark.sql.classic.Dataset.count(Dataset.scala:1501) ``` The RowEncoder tried to relookup a UDT which is already resolved ### Why are the changes needed? bugfix ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? modified tests in sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala ### Was this patch authored or co-authored using generative AI tooling? no Closes #51360 from yaooqinn/SPARK-52671. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../src/main/resources/error/error-conditions.json | 5 ----- .../spark/sql/catalyst/encoders/RowEncoder.scala | 13 ++---------- .../apache/spark/sql/errors/ExecutionErrors.scala | 9 +-------- .../sql/catalyst/encoders/RowEncoderSuite.scala | 23 +++++++++++++++++++++- 4 files changed, 25 insertions(+), 25 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 000b1f524f20..cb7e2faaf2be 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -8345,11 +8345,6 @@ "Failed to get outer pointer for <innerCls>." ] }, - "_LEGACY_ERROR_TEMP_2155" : { - "message" : [ - "<userClass> is not annotated with SQLUserDefinedType nor registered with UDTRegistration.}" - ] - }, "_LEGACY_ERROR_TEMP_2163" : { "message" : [ "Initial type <dataType> must be a <target>." diff --git a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala index d5692bb85c4e..620278c66d21 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala @@ -22,7 +22,7 @@ import scala.reflect.classTag import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{BinaryEncoder, BoxedBooleanEncoder, BoxedByteEncoder, BoxedDoubleEncoder, BoxedFloatEncoder, BoxedIntEncoder, BoxedLongEncoder, BoxedShortEncoder, CalendarIntervalEncoder, CharEncoder, DateEncoder, DayTimeIntervalEncoder, EncoderField, InstantEncoder, IterableEncoder, JavaDecimalEncoder, LocalDateEncoder, LocalDateTimeEncoder, LocalTimeEncoder, MapEncoder, NullEncoder, RowEncoder => AgnosticRowEncoder, StringEncoder, Timesta [...] -import org.apache.spark.sql.errors.{DataTypeErrorsBase, ExecutionErrors} +import org.apache.spark.sql.errors.DataTypeErrorsBase import org.apache.spark.sql.internal.SqlApiConf import org.apache.spark.sql.types._ import org.apache.spark.util.ArrayImplicits._ @@ -99,16 +99,7 @@ object RowEncoder extends DataTypeErrorsBase { case p: PythonUserDefinedType => // TODO check if this works. encoderForDataType(p.sqlType, lenient) - case udt: UserDefinedType[_] => - val annotation = udt.userClass.getAnnotation(classOf[SQLUserDefinedType]) - val udtClass: Class[_] = if (annotation != null) { - annotation.udt() - } else { - UDTRegistration.getUDTFor(udt.userClass.getName).getOrElse { - throw ExecutionErrors.userDefinedTypeNotAnnotatedAndRegisteredError(udt) - } - } - UDTEncoder(udt, udtClass.asInstanceOf[Class[_ <: UserDefinedType[_]]]) + case udt: UserDefinedType[_] => UDTEncoder(udt, udt.getClass) case ArrayType(elementType, containsNull) => IterableEncoder( classTag[mutable.ArraySeq[_]], diff --git a/sql/api/src/main/scala/org/apache/spark/sql/errors/ExecutionErrors.scala b/sql/api/src/main/scala/org/apache/spark/sql/errors/ExecutionErrors.scala index 8124b1a4ab19..1a4369b172f8 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/errors/ExecutionErrors.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/errors/ExecutionErrors.scala @@ -24,7 +24,7 @@ import org.apache.arrow.vector.types.pojo.ArrowType import org.apache.spark.{QueryContext, SparkArithmeticException, SparkBuildInfo, SparkDateTimeException, SparkException, SparkRuntimeException, SparkUnsupportedOperationException, SparkUpgradeException} import org.apache.spark.sql.catalyst.WalkedTypePath import org.apache.spark.sql.internal.SqlApiConf -import org.apache.spark.sql.types.{DataType, DoubleType, StringType, UserDefinedType} +import org.apache.spark.sql.types.{DataType, DoubleType, StringType} import org.apache.spark.unsafe.types.UTF8String private[sql] trait ExecutionErrors extends DataTypeErrorsBase { @@ -160,13 +160,6 @@ private[sql] trait ExecutionErrors extends DataTypeErrorsBase { messageParameters = Map("typeName" -> toSQLType(typeName))) } - def userDefinedTypeNotAnnotatedAndRegisteredError(udt: UserDefinedType[_]): Throwable = { - new SparkException( - errorClass = "_LEGACY_ERROR_TEMP_2155", - messageParameters = Map("userClass" -> udt.userClass.getName), - cause = null) - } - def cannotFindEncoderForTypeError(typeName: String): SparkUnsupportedOperationException = { new SparkUnsupportedOperationException( errorClass = "ENCODER_NOT_FOUND", diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala index 05ccaf2cbda0..008aa976d605 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/RowEncoderSuite.scala @@ -75,6 +75,26 @@ class ExamplePointUDT extends UserDefinedType[ExamplePoint] { private[spark] override def asNullable: ExamplePointUDT = this } +class ExamplePointNotAnnotated(val x: Double, val y: Double) extends Serializable { + private val inner = new ExamplePoint(x, y) + override def hashCode: Int = inner.hashCode + override def equals(that: Any): Boolean = { + that match { + case e: ExamplePointNotAnnotated => inner.equals(e.inner) + case _ => false + } + } +} +class ExamplePointNotAnnotatedUDT extends UserDefinedType[ExamplePointNotAnnotated] { + override def sqlType: DataType = DoubleType + override def serialize(p: ExamplePointNotAnnotated): Double = p.x + override def deserialize(datum: Any): ExamplePointNotAnnotated = { + val x = datum.asInstanceOf[Double] + new ExamplePointNotAnnotated(x, 3.14 * datum.asInstanceOf[Double]) + } + override def userClass: Class[ExamplePointNotAnnotated] = classOf[ExamplePointNotAnnotated] +} + class RowEncoderSuite extends CodegenInterpretedPlanTest { private val structOfString = new StructType().add("str", StringType) @@ -111,7 +131,8 @@ class RowEncoderSuite extends CodegenInterpretedPlanTest { .add("binary", BinaryType) .add("date", DateType) .add("timestamp", TimestampType) - .add("udt", new ExamplePointUDT)) + .add("udt", new ExamplePointUDT) + .add("udtNotAnnotated", new ExamplePointNotAnnotatedUDT)) encodeDecodeTest( new StructType() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org