This is an automated email from the ASF dual-hosted git repository. maxgekk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ba5af225f97 [SPARK-38108][SQL] Use error classes in the compilation errors of UDF/UDAF ba5af225f97 is described below commit ba5af225f97da147ea72d0f614165fb28e7fc568 Author: Tengfei Huang <tengfe...@gmail.com> AuthorDate: Sun Apr 10 14:34:04 2022 +0300 [SPARK-38108][SQL] Use error classes in the compilation errors of UDF/UDAF ### What changes were proposed in this pull request? Migrate the following UDF/UDAF compilation errors in QueryCompilationErrors: - noHandlerForUDAFError -> NO_HANDLER_FOR_UDAF - unexpectedEvalTypesForUDFsError -> IllegalStateException - usingUntypedScalaUDFError -> UNTYPED_SCALA_UDF - udfClassDoesNotImplementAnyUDFInterfaceError -> NO_UDF_INTERFACE_ERROR - udfClassNotAllowedToImplementMultiUDFInterfacesError -> MULTI_UDF_INTERFACE_ERROR - udfClassWithTooManyTypeArgumentsError -> UDF_WITH_TOO_MANY_TYPE_ARGUMENT_ERROR ### Why are the changes needed? Porting udf/udaf compilation errors to new error framework. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT added. Closes #36064 from ivoson/SPARK-38108. Lead-authored-by: Tengfei Huang <tengfe...@gmail.com> Co-authored-by: Huang Tengfei <tengfe...@gmail.com> Co-authored-by: Maxim Gekk <max.g...@gmail.com> Signed-off-by: Max Gekk <max.g...@gmail.com> --- core/src/main/resources/error/error-classes.json | 12 +++ .../catalog/InvalidUDFClassException.scala | 10 +- .../spark/sql/errors/QueryCompilationErrors.scala | 38 +++---- .../org/apache/spark/sql/UDFRegistration.scala | 2 +- .../sql/execution/python/ExtractPythonUDFs.scala | 5 +- .../org/apache/spark/sql/api/java/UDF23Test.java} | 19 ++-- .../test/scala/org/apache/spark/sql/UDFSuite.scala | 5 - .../sql/errors/QueryCompilationErrorsSuite.scala | 114 ++++++++++++++++++++- 8 files changed, 165 insertions(+), 40 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index 855d3c5cd6e..fddfb004e1e 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -134,6 +134,9 @@ "message" : [ "Unknown static partition column: %s" ], "sqlState" : "42000" }, + "MULTI_UDF_INTERFACE_ERROR" : { + "message" : [ "Not allowed to implement multiple UDF interfaces, UDF class %s" ] + }, "NON_LITERAL_PIVOT_VALUES" : { "message" : [ "Literal expressions required for pivot values, found '%s'" ], "sqlState" : "42000" @@ -142,6 +145,12 @@ "message" : [ "PARTITION clause cannot contain a non-partition column name: %s" ], "sqlState" : "42000" }, + "NO_HANDLER_FOR_UDAF" : { + "message" : [ "No handler for UDAF '%s'. Use sparkSession.udf.register(...) instead." ] + }, + "NO_UDF_INTERFACE_ERROR" : { + "message" : [ "UDF class %s doesn't implement any UDF interface" ] + }, "PARSE_CHAR_MISSING_LENGTH" : { "message" : [ "DataType %s requires a length parameter, for example %s(10). Please specify the length." ], "sqlState" : "42000" @@ -187,6 +196,9 @@ "UNSUPPORTED_OPERATION" : { "message" : [ "The operation is not supported: %s" ] }, + "UNTYPED_SCALA_UDF" : { + "message" : [ "You're using untyped Scala UDF, which does not have the input type information. Spark may blindly pass null to the Scala closure with primitive-type argument, and the closure will see the default value of the Java type for the null argument, e.g. `udf((x: Int) => x, IntegerType)`, the result is 0 for null input. To get rid of this error, you could:\n1. use typed Scala UDF APIs(without return type parameter), e.g. `udf((x: Int) => x)`\n2. use Java UDF APIs, e.g. `udf(ne [...] + }, "WRITING_JOB_ABORTED" : { "message" : [ "Writing job aborted" ], "sqlState" : "40000" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InvalidUDFClassException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InvalidUDFClassException.scala index bc02efd5113..0e5e52a9c90 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InvalidUDFClassException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InvalidUDFClassException.scala @@ -17,12 +17,18 @@ package org.apache.spark.sql.catalyst.catalog +import org.apache.spark.SparkThrowableHelper import org.apache.spark.sql.AnalysisException /** * Thrown when a query failed for invalid function class, usually because a SQL * function's class does not follow the rules of the UDF/UDAF/UDTF class definition. */ -class InvalidUDFClassException private[sql](message: String) - extends AnalysisException(message, None, None, None, None) { +class InvalidUDFClassException private[sql]( + message: String, + errorClass: Option[String] = None) + extends AnalysisException(message = message, errorClass = errorClass) { + + def this(errorClass: String, messageParameters: Array[String]) = + this(SparkThrowableHelper.getMessage(errorClass, messageParameters), Some(errorClass)) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 57ed7da7b20..b6b64804904 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -764,8 +764,9 @@ object QueryCompilationErrors { } def noHandlerForUDAFError(name: String): Throwable = { - new InvalidUDFClassException(s"No handler for UDAF '$name'. " + - "Use sparkSession.udf.register(...) instead.") + new InvalidUDFClassException( + errorClass = "NO_HANDLER_FOR_UDAF", + messageParameters = Array(name)) } def batchWriteCapabilityError( @@ -1357,12 +1358,6 @@ object QueryCompilationErrors { """.stripMargin.replaceAll("\n", " ")) } - def unexpectedEvalTypesForUDFsError(evalTypes: Set[Int]): Throwable = { - new AnalysisException( - s"Expected udfs have the same evalType but got different evalTypes: " + - s"${evalTypes.mkString(",")}") - } - def ambiguousFieldNameError( fieldName: Seq[String], numMatches: Int, context: Origin): Throwable = { new AnalysisException( @@ -2264,17 +2259,9 @@ object QueryCompilationErrors { } def usingUntypedScalaUDFError(): Throwable = { - new AnalysisException("You're using untyped Scala UDF, which does not have the input type " + - "information. Spark may blindly pass null to the Scala closure with primitive-type " + - "argument, and the closure will see the default value of the Java type for the null " + - "argument, e.g. `udf((x: Int) => x, IntegerType)`, the result is 0 for null input. " + - "To get rid of this error, you could:\n" + - "1. use typed Scala UDF APIs(without return type parameter), e.g. `udf((x: Int) => x)`\n" + - "2. use Java UDF APIs, e.g. `udf(new UDF1[String, Integer] { " + - "override def call(s: String): Integer = s.length() }, IntegerType)`, " + - "if input types are all non primitive\n" + - s"3. set ${SQLConf.LEGACY_ALLOW_UNTYPED_SCALA_UDF.key} to true and " + - s"use this API with caution") + new AnalysisException( + errorClass = "UNTYPED_SCALA_UDF", + messageParameters = Array.empty) } def aggregationFunctionAppliedOnNonNumericColumnError(colName: String): Throwable = { @@ -2310,16 +2297,21 @@ object QueryCompilationErrors { } def udfClassDoesNotImplementAnyUDFInterfaceError(className: String): Throwable = { - new AnalysisException(s"UDF class $className doesn't implement any UDF interface") + new AnalysisException( + errorClass = "NO_UDF_INTERFACE_ERROR", + messageParameters = Array(className)) } - def udfClassNotAllowedToImplementMultiUDFInterfacesError(className: String): Throwable = { + def udfClassImplementMultiUDFInterfacesError(className: String): Throwable = { new AnalysisException( - s"It is invalid to implement multiple UDF interfaces, UDF class $className") + errorClass = "MULTI_UDF_INTERFACE_ERROR", + messageParameters = Array(className)) } def udfClassWithTooManyTypeArgumentsError(n: Int): Throwable = { - new AnalysisException(s"UDF class with $n type arguments is not supported.") + new AnalysisException( + errorClass = "UNSUPPORTED_FEATURE", + messageParameters = Array(s"UDF class with $n type arguments")) } def classWithoutPublicNonArgumentConstructorError(className: String): Throwable = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala index 366aa72a409..fb7323a48cc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala @@ -661,7 +661,7 @@ class UDFRegistration private[sql] (functionRegistry: FunctionRegistry) extends if (udfInterfaces.length == 0) { throw QueryCompilationErrors.udfClassDoesNotImplementAnyUDFInterfaceError(className) } else if (udfInterfaces.length > 1) { - throw QueryCompilationErrors.udfClassNotAllowedToImplementMultiUDFInterfacesError(className) + throw QueryCompilationErrors.udfClassImplementMultiUDFInterfacesError(className) } else { try { val udf = clazz.getConstructor().newInstance() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala index a809ea07d0e..fb90187a8b1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern._ -import org.apache.spark.sql.errors.QueryCompilationErrors /** @@ -264,7 +263,9 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with PredicateHelper { val evalTypes = validUdfs.map(_.evalType).toSet if (evalTypes.size != 1) { - throw QueryCompilationErrors.unexpectedEvalTypesForUDFsError(evalTypes) + throw new IllegalStateException( + "Expected udfs have the same evalType but got different evalTypes: " + + evalTypes.mkString(",")) } val evalType = evalTypes.head val evaluation = evalType match { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InvalidUDFClassException.scala b/sql/core/src/test/java/org/apache/spark/sql/api/java/UDF23Test.java similarity index 58% copy from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InvalidUDFClassException.scala copy to sql/core/src/test/java/org/apache/spark/sql/api/java/UDF23Test.java index bc02efd5113..545af5987b9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InvalidUDFClassException.scala +++ b/sql/core/src/test/java/org/apache/spark/sql/api/java/UDF23Test.java @@ -15,14 +15,21 @@ * limitations under the License. */ -package org.apache.spark.sql.catalyst.catalog +package org.apache.spark.sql.api.java; -import org.apache.spark.sql.AnalysisException +import java.io.Serializable; + +import org.apache.spark.annotation.Stable; /** - * Thrown when a query failed for invalid function class, usually because a SQL - * function's class does not follow the rules of the UDF/UDAF/UDTF class definition. + * A Spark SQL UDF that has 23 arguments for test. */ -class InvalidUDFClassException private[sql](message: String) - extends AnalysisException(message, None, None, None, None) { +@Stable +public interface UDF23Test<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, + T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, R> extends Serializable { + + R call( + T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9, T10 t10, + T11 t11, T12 t12, T13 t13, T14 t14, T15 t15, T16 t16, T17 t17, T18 t18, + T19 t19, T20 t20, T21 t21, T22 t22, T23 t23) throws Exception; } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index 1393eba40b7..8fd7aa3e0cd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -471,11 +471,6 @@ class UDFSuite extends QueryTest with SharedSparkSession { } - test("use untyped Scala UDF should fail by default") { - val e = intercept[AnalysisException](udf((x: Int) => x, IntegerType)) - assert(e.getMessage.contains("You're using untyped Scala UDF")) - } - test("SPARK-26308: udf with decimal") { val df1 = spark.createDataFrame( sparkContext.parallelize(Seq(Row(new BigDecimal("2011000000000002456556")))), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala index 4d776caacf3..e9a86df7f07 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala @@ -18,9 +18,12 @@ package org.apache.spark.sql.errors import org.apache.spark.sql.{AnalysisException, IntegratedUDFTestUtils, QueryTest} -import org.apache.spark.sql.functions.{grouping, grouping_id, sum} +import org.apache.spark.sql.api.java.{UDF1, UDF2, UDF23Test} +import org.apache.spark.sql.expressions.SparkUserDefinedFunction +import org.apache.spark.sql.functions.{grouping, grouping_id, sum, udf} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.{IntegerType, StringType} case class StringLongClass(a: String, b: Long) @@ -171,4 +174,113 @@ class QueryCompilationErrorsSuite extends QueryTest with SharedSparkSession { "The feature is not supported: " + "Pandas UDF aggregate expressions don't support pivot.") } + + test("NO_HANDLER_FOR_UDAF: No handler for UDAF error") { + val functionName = "myCast" + withUserDefinedFunction(functionName -> true) { + sql( + s""" + |CREATE TEMPORARY FUNCTION $functionName + |AS 'org.apache.spark.sql.errors.MyCastToString' + |""".stripMargin) + + val e = intercept[AnalysisException] ( + sql(s"SELECT $functionName(123) as value") + ) + + assert(e.errorClass === Some("NO_HANDLER_FOR_UDAF")) + assert(e.message === + "No handler for UDAF 'org.apache.spark.sql.errors.MyCastToString'. " + + "Use sparkSession.udf.register(...) instead.") + } + } + + test("UNTYPED_SCALA_UDF: use untyped Scala UDF should fail by default") { + val e = intercept[AnalysisException](udf((x: Int) => x, IntegerType)) + + assert(e.errorClass === Some("UNTYPED_SCALA_UDF")) + assert(e.message === + "You're using untyped Scala UDF, which does not have the input type " + + "information. Spark may blindly pass null to the Scala closure with primitive-type " + + "argument, and the closure will see the default value of the Java type for the null " + + "argument, e.g. `udf((x: Int) => x, IntegerType)`, the result is 0 for null input. " + + "To get rid of this error, you could:\n" + + "1. use typed Scala UDF APIs(without return type parameter), e.g. `udf((x: Int) => x)`\n" + + "2. use Java UDF APIs, e.g. `udf(new UDF1[String, Integer] { " + + "override def call(s: String): Integer = s.length() }, IntegerType)`, " + + "if input types are all non primitive\n" + + s"3. set ${SQLConf.LEGACY_ALLOW_UNTYPED_SCALA_UDF.key} to true and " + + s"use this API with caution") + } + + test("NO_UDF_INTERFACE_ERROR: java udf class does not implement any udf interface") { + val className = "org.apache.spark.sql.errors.MyCastToString" + val e = intercept[AnalysisException]( + spark.udf.registerJava( + "myCast", + className, + StringType) + ) + + assert(e.errorClass === Some("NO_UDF_INTERFACE_ERROR")) + assert(e.message === + s"UDF class $className doesn't implement any UDF interface") + } + + test("MULTI_UDF_INTERFACE_ERROR: java udf implement multi UDF interface") { + val className = "org.apache.spark.sql.errors.MySum" + val e = intercept[AnalysisException]( + spark.udf.registerJava( + "mySum", + className, + StringType) + ) + + assert(e.errorClass === Some("MULTI_UDF_INTERFACE_ERROR")) + assert(e.message === + s"Not allowed to implement multiple UDF interfaces, UDF class $className") + } + + test("UNSUPPORTED_FEATURE: java udf with too many type arguments") { + val className = "org.apache.spark.sql.errors.MultiIntSum" + val e = intercept[AnalysisException]( + spark.udf.registerJava( + "mySum", + className, + StringType) + ) + + assert(e.errorClass === Some("UNSUPPORTED_FEATURE")) + assert(e.getSqlState === "0A000") + assert(e.message === "The feature is not supported: UDF class with 24 type arguments") + } +} + +class MyCastToString extends SparkUserDefinedFunction( + (input: Any) => if (input == null) { + null + } else { + input.toString + }, + StringType, + inputEncoders = Seq.fill(1)(None)) + +class MySum extends UDF1[Int, Int] with UDF2[Int, Int, Int] { + override def call(t1: Int): Int = t1 + + override def call(t1: Int, t2: Int): Int = t1 + t2 +} + +class MultiIntSum extends + UDF23Test[Int, Int, Int, Int, Int, Int, Int, Int, Int, Int, Int, Int, + Int, Int, Int, Int, Int, Int, Int, Int, Int, Int, Int, Int] { + // scalastyle:off argcount + override def call( + t1: Int, t2: Int, t3: Int, t4: Int, t5: Int, t6: Int, t7: Int, t8: Int, + t9: Int, t10: Int, t11: Int, t12: Int, t13: Int, t14: Int, t15: Int, t16: Int, + t17: Int, t18: Int, t19: Int, t20: Int, t21: Int, t22: Int, t23: Int): Int = { + t1 + t2 + t3 + t4 + t5 + t6 + t7 + t8 + t9 + t10 + + t11 + t12 + t13 + t14 + t15 + t16 + t17 + t18 + t19 + t20 + t21 + t22 + t23 + } + // scalastyle:on argcount } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org