This is an automated email from the ASF dual-hosted git repository.
maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ba5af225f97 [SPARK-38108][SQL] Use error classes in the compilation
errors of UDF/UDAF
ba5af225f97 is described below
commit ba5af225f97da147ea72d0f614165fb28e7fc568
Author: Tengfei Huang <[email protected]>
AuthorDate: Sun Apr 10 14:34:04 2022 +0300
[SPARK-38108][SQL] Use error classes in the compilation errors of UDF/UDAF
### What changes were proposed in this pull request?
Migrate the following UDF/UDAF compilation errors in QueryCompilationErrors:
- noHandlerForUDAFError -> NO_HANDLER_FOR_UDAF
- unexpectedEvalTypesForUDFsError -> IllegalStateException
- usingUntypedScalaUDFError -> UNTYPED_SCALA_UDF
- udfClassDoesNotImplementAnyUDFInterfaceError -> NO_UDF_INTERFACE_ERROR
- udfClassNotAllowedToImplementMultiUDFInterfacesError ->
MULTI_UDF_INTERFACE_ERROR
- udfClassWithTooManyTypeArgumentsError ->
UDF_WITH_TOO_MANY_TYPE_ARGUMENT_ERROR
### Why are the changes needed?
Porting udf/udaf compilation errors to new error framework.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT added.
Closes #36064 from ivoson/SPARK-38108.
Lead-authored-by: Tengfei Huang <[email protected]>
Co-authored-by: Huang Tengfei <[email protected]>
Co-authored-by: Maxim Gekk <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
---
core/src/main/resources/error/error-classes.json | 12 +++
.../catalog/InvalidUDFClassException.scala | 10 +-
.../spark/sql/errors/QueryCompilationErrors.scala | 38 +++----
.../org/apache/spark/sql/UDFRegistration.scala | 2 +-
.../sql/execution/python/ExtractPythonUDFs.scala | 5 +-
.../org/apache/spark/sql/api/java/UDF23Test.java} | 19 ++--
.../test/scala/org/apache/spark/sql/UDFSuite.scala | 5 -
.../sql/errors/QueryCompilationErrorsSuite.scala | 114 ++++++++++++++++++++-
8 files changed, 165 insertions(+), 40 deletions(-)
diff --git a/core/src/main/resources/error/error-classes.json
b/core/src/main/resources/error/error-classes.json
index 855d3c5cd6e..fddfb004e1e 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -134,6 +134,9 @@
"message" : [ "Unknown static partition column: %s" ],
"sqlState" : "42000"
},
+ "MULTI_UDF_INTERFACE_ERROR" : {
+ "message" : [ "Not allowed to implement multiple UDF interfaces, UDF class
%s" ]
+ },
"NON_LITERAL_PIVOT_VALUES" : {
"message" : [ "Literal expressions required for pivot values, found '%s'"
],
"sqlState" : "42000"
@@ -142,6 +145,12 @@
"message" : [ "PARTITION clause cannot contain a non-partition column
name: %s" ],
"sqlState" : "42000"
},
+ "NO_HANDLER_FOR_UDAF" : {
+ "message" : [ "No handler for UDAF '%s'. Use
sparkSession.udf.register(...) instead." ]
+ },
+ "NO_UDF_INTERFACE_ERROR" : {
+ "message" : [ "UDF class %s doesn't implement any UDF interface" ]
+ },
"PARSE_CHAR_MISSING_LENGTH" : {
"message" : [ "DataType %s requires a length parameter, for example
%s(10). Please specify the length." ],
"sqlState" : "42000"
@@ -187,6 +196,9 @@
"UNSUPPORTED_OPERATION" : {
"message" : [ "The operation is not supported: %s" ]
},
+ "UNTYPED_SCALA_UDF" : {
+ "message" : [ "You're using untyped Scala UDF, which does not have the
input type information. Spark may blindly pass null to the Scala closure with
primitive-type argument, and the closure will see the default value of the Java
type for the null argument, e.g. `udf((x: Int) => x, IntegerType)`, the result
is 0 for null input. To get rid of this error, you could:\n1. use typed Scala
UDF APIs(without return type parameter), e.g. `udf((x: Int) => x)`\n2. use Java
UDF APIs, e.g. `udf(ne [...]
+ },
"WRITING_JOB_ABORTED" : {
"message" : [ "Writing job aborted" ],
"sqlState" : "40000"
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InvalidUDFClassException.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InvalidUDFClassException.scala
index bc02efd5113..0e5e52a9c90 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InvalidUDFClassException.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InvalidUDFClassException.scala
@@ -17,12 +17,18 @@
package org.apache.spark.sql.catalyst.catalog
+import org.apache.spark.SparkThrowableHelper
import org.apache.spark.sql.AnalysisException
/**
* Thrown when a query failed for invalid function class, usually because a SQL
* function's class does not follow the rules of the UDF/UDAF/UDTF class
definition.
*/
-class InvalidUDFClassException private[sql](message: String)
- extends AnalysisException(message, None, None, None, None) {
+class InvalidUDFClassException private[sql](
+ message: String,
+ errorClass: Option[String] = None)
+ extends AnalysisException(message = message, errorClass = errorClass) {
+
+ def this(errorClass: String, messageParameters: Array[String]) =
+ this(SparkThrowableHelper.getMessage(errorClass, messageParameters),
Some(errorClass))
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 57ed7da7b20..b6b64804904 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -764,8 +764,9 @@ object QueryCompilationErrors {
}
def noHandlerForUDAFError(name: String): Throwable = {
- new InvalidUDFClassException(s"No handler for UDAF '$name'. " +
- "Use sparkSession.udf.register(...) instead.")
+ new InvalidUDFClassException(
+ errorClass = "NO_HANDLER_FOR_UDAF",
+ messageParameters = Array(name))
}
def batchWriteCapabilityError(
@@ -1357,12 +1358,6 @@ object QueryCompilationErrors {
""".stripMargin.replaceAll("\n", " "))
}
- def unexpectedEvalTypesForUDFsError(evalTypes: Set[Int]): Throwable = {
- new AnalysisException(
- s"Expected udfs have the same evalType but got different evalTypes: " +
- s"${evalTypes.mkString(",")}")
- }
-
def ambiguousFieldNameError(
fieldName: Seq[String], numMatches: Int, context: Origin): Throwable = {
new AnalysisException(
@@ -2264,17 +2259,9 @@ object QueryCompilationErrors {
}
def usingUntypedScalaUDFError(): Throwable = {
- new AnalysisException("You're using untyped Scala UDF, which does not have
the input type " +
- "information. Spark may blindly pass null to the Scala closure with
primitive-type " +
- "argument, and the closure will see the default value of the Java type
for the null " +
- "argument, e.g. `udf((x: Int) => x, IntegerType)`, the result is 0 for
null input. " +
- "To get rid of this error, you could:\n" +
- "1. use typed Scala UDF APIs(without return type parameter), e.g.
`udf((x: Int) => x)`\n" +
- "2. use Java UDF APIs, e.g. `udf(new UDF1[String, Integer] { " +
- "override def call(s: String): Integer = s.length() }, IntegerType)`, " +
- "if input types are all non primitive\n" +
- s"3. set ${SQLConf.LEGACY_ALLOW_UNTYPED_SCALA_UDF.key} to true and " +
- s"use this API with caution")
+ new AnalysisException(
+ errorClass = "UNTYPED_SCALA_UDF",
+ messageParameters = Array.empty)
}
def aggregationFunctionAppliedOnNonNumericColumnError(colName: String):
Throwable = {
@@ -2310,16 +2297,21 @@ object QueryCompilationErrors {
}
def udfClassDoesNotImplementAnyUDFInterfaceError(className: String):
Throwable = {
- new AnalysisException(s"UDF class $className doesn't implement any UDF
interface")
+ new AnalysisException(
+ errorClass = "NO_UDF_INTERFACE_ERROR",
+ messageParameters = Array(className))
}
- def udfClassNotAllowedToImplementMultiUDFInterfacesError(className: String):
Throwable = {
+ def udfClassImplementMultiUDFInterfacesError(className: String): Throwable =
{
new AnalysisException(
- s"It is invalid to implement multiple UDF interfaces, UDF class
$className")
+ errorClass = "MULTI_UDF_INTERFACE_ERROR",
+ messageParameters = Array(className))
}
def udfClassWithTooManyTypeArgumentsError(n: Int): Throwable = {
- new AnalysisException(s"UDF class with $n type arguments is not
supported.")
+ new AnalysisException(
+ errorClass = "UNSUPPORTED_FEATURE",
+ messageParameters = Array(s"UDF class with $n type arguments"))
}
def classWithoutPublicNonArgumentConstructorError(className: String):
Throwable = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
index 366aa72a409..fb7323a48cc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
@@ -661,7 +661,7 @@ class UDFRegistration private[sql] (functionRegistry:
FunctionRegistry) extends
if (udfInterfaces.length == 0) {
throw
QueryCompilationErrors.udfClassDoesNotImplementAnyUDFInterfaceError(className)
} else if (udfInterfaces.length > 1) {
- throw
QueryCompilationErrors.udfClassNotAllowedToImplementMultiUDFInterfacesError(className)
+ throw
QueryCompilationErrors.udfClassImplementMultiUDFInterfacesError(className)
} else {
try {
val udf = clazz.getConstructor().newInstance()
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
index a809ea07d0e..fb90187a8b1 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
@@ -26,7 +26,6 @@ import
org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern._
-import org.apache.spark.sql.errors.QueryCompilationErrors
/**
@@ -264,7 +263,9 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with
PredicateHelper {
val evalTypes = validUdfs.map(_.evalType).toSet
if (evalTypes.size != 1) {
- throw
QueryCompilationErrors.unexpectedEvalTypesForUDFsError(evalTypes)
+ throw new IllegalStateException(
+ "Expected udfs have the same evalType but got different
evalTypes: " +
+ evalTypes.mkString(","))
}
val evalType = evalTypes.head
val evaluation = evalType match {
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InvalidUDFClassException.scala
b/sql/core/src/test/java/org/apache/spark/sql/api/java/UDF23Test.java
similarity index 58%
copy from
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InvalidUDFClassException.scala
copy to sql/core/src/test/java/org/apache/spark/sql/api/java/UDF23Test.java
index bc02efd5113..545af5987b9 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InvalidUDFClassException.scala
+++ b/sql/core/src/test/java/org/apache/spark/sql/api/java/UDF23Test.java
@@ -15,14 +15,21 @@
* limitations under the License.
*/
-package org.apache.spark.sql.catalyst.catalog
+package org.apache.spark.sql.api.java;
-import org.apache.spark.sql.AnalysisException
+import java.io.Serializable;
+
+import org.apache.spark.annotation.Stable;
/**
- * Thrown when a query failed for invalid function class, usually because a SQL
- * function's class does not follow the rules of the UDF/UDAF/UDTF class
definition.
+ * A Spark SQL UDF that has 23 arguments for test.
*/
-class InvalidUDFClassException private[sql](message: String)
- extends AnalysisException(message, None, None, None, None) {
+@Stable
+public interface UDF23Test<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12,
+ T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, R> extends
Serializable {
+
+ R call(
+ T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9, T10 t10,
+ T11 t11, T12 t12, T13 t13, T14 t14, T15 t15, T16 t16, T17 t17, T18 t18,
+ T19 t19, T20 t20, T21 t21, T22 t22, T23 t23) throws Exception;
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
index 1393eba40b7..8fd7aa3e0cd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -471,11 +471,6 @@ class UDFSuite extends QueryTest with SharedSparkSession {
}
- test("use untyped Scala UDF should fail by default") {
- val e = intercept[AnalysisException](udf((x: Int) => x, IntegerType))
- assert(e.getMessage.contains("You're using untyped Scala UDF"))
- }
-
test("SPARK-26308: udf with decimal") {
val df1 = spark.createDataFrame(
sparkContext.parallelize(Seq(Row(new
BigDecimal("2011000000000002456556")))),
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
index 4d776caacf3..e9a86df7f07 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
@@ -18,9 +18,12 @@
package org.apache.spark.sql.errors
import org.apache.spark.sql.{AnalysisException, IntegratedUDFTestUtils,
QueryTest}
-import org.apache.spark.sql.functions.{grouping, grouping_id, sum}
+import org.apache.spark.sql.api.java.{UDF1, UDF2, UDF23Test}
+import org.apache.spark.sql.expressions.SparkUserDefinedFunction
+import org.apache.spark.sql.functions.{grouping, grouping_id, sum, udf}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, StringType}
case class StringLongClass(a: String, b: Long)
@@ -171,4 +174,113 @@ class QueryCompilationErrorsSuite extends QueryTest with
SharedSparkSession {
"The feature is not supported: " +
"Pandas UDF aggregate expressions don't support pivot.")
}
+
+ test("NO_HANDLER_FOR_UDAF: No handler for UDAF error") {
+ val functionName = "myCast"
+ withUserDefinedFunction(functionName -> true) {
+ sql(
+ s"""
+ |CREATE TEMPORARY FUNCTION $functionName
+ |AS 'org.apache.spark.sql.errors.MyCastToString'
+ |""".stripMargin)
+
+ val e = intercept[AnalysisException] (
+ sql(s"SELECT $functionName(123) as value")
+ )
+
+ assert(e.errorClass === Some("NO_HANDLER_FOR_UDAF"))
+ assert(e.message ===
+ "No handler for UDAF 'org.apache.spark.sql.errors.MyCastToString'. " +
+ "Use sparkSession.udf.register(...) instead.")
+ }
+ }
+
+ test("UNTYPED_SCALA_UDF: use untyped Scala UDF should fail by default") {
+ val e = intercept[AnalysisException](udf((x: Int) => x, IntegerType))
+
+ assert(e.errorClass === Some("UNTYPED_SCALA_UDF"))
+ assert(e.message ===
+ "You're using untyped Scala UDF, which does not have the input type " +
+ "information. Spark may blindly pass null to the Scala closure with
primitive-type " +
+ "argument, and the closure will see the default value of the Java type
for the null " +
+ "argument, e.g. `udf((x: Int) => x, IntegerType)`, the result is 0 for
null input. " +
+ "To get rid of this error, you could:\n" +
+ "1. use typed Scala UDF APIs(without return type parameter), e.g.
`udf((x: Int) => x)`\n" +
+ "2. use Java UDF APIs, e.g. `udf(new UDF1[String, Integer] { " +
+ "override def call(s: String): Integer = s.length() }, IntegerType)`, " +
+ "if input types are all non primitive\n" +
+ s"3. set ${SQLConf.LEGACY_ALLOW_UNTYPED_SCALA_UDF.key} to true and " +
+ s"use this API with caution")
+ }
+
+ test("NO_UDF_INTERFACE_ERROR: java udf class does not implement any udf
interface") {
+ val className = "org.apache.spark.sql.errors.MyCastToString"
+ val e = intercept[AnalysisException](
+ spark.udf.registerJava(
+ "myCast",
+ className,
+ StringType)
+ )
+
+ assert(e.errorClass === Some("NO_UDF_INTERFACE_ERROR"))
+ assert(e.message ===
+ s"UDF class $className doesn't implement any UDF interface")
+ }
+
+ test("MULTI_UDF_INTERFACE_ERROR: java udf implement multi UDF interface") {
+ val className = "org.apache.spark.sql.errors.MySum"
+ val e = intercept[AnalysisException](
+ spark.udf.registerJava(
+ "mySum",
+ className,
+ StringType)
+ )
+
+ assert(e.errorClass === Some("MULTI_UDF_INTERFACE_ERROR"))
+ assert(e.message ===
+ s"Not allowed to implement multiple UDF interfaces, UDF class
$className")
+ }
+
+ test("UNSUPPORTED_FEATURE: java udf with too many type arguments") {
+ val className = "org.apache.spark.sql.errors.MultiIntSum"
+ val e = intercept[AnalysisException](
+ spark.udf.registerJava(
+ "mySum",
+ className,
+ StringType)
+ )
+
+ assert(e.errorClass === Some("UNSUPPORTED_FEATURE"))
+ assert(e.getSqlState === "0A000")
+ assert(e.message === "The feature is not supported: UDF class with 24 type
arguments")
+ }
+}
+
+class MyCastToString extends SparkUserDefinedFunction(
+ (input: Any) => if (input == null) {
+ null
+ } else {
+ input.toString
+ },
+ StringType,
+ inputEncoders = Seq.fill(1)(None))
+
+class MySum extends UDF1[Int, Int] with UDF2[Int, Int, Int] {
+ override def call(t1: Int): Int = t1
+
+ override def call(t1: Int, t2: Int): Int = t1 + t2
+}
+
+class MultiIntSum extends
+ UDF23Test[Int, Int, Int, Int, Int, Int, Int, Int, Int, Int, Int, Int,
+ Int, Int, Int, Int, Int, Int, Int, Int, Int, Int, Int, Int] {
+ // scalastyle:off argcount
+ override def call(
+ t1: Int, t2: Int, t3: Int, t4: Int, t5: Int, t6: Int, t7: Int, t8: Int,
+ t9: Int, t10: Int, t11: Int, t12: Int, t13: Int, t14: Int, t15: Int,
t16: Int,
+ t17: Int, t18: Int, t19: Int, t20: Int, t21: Int, t22: Int, t23: Int):
Int = {
+ t1 + t2 + t3 + t4 + t5 + t6 + t7 + t8 + t9 + t10 +
+ t11 + t12 + t13 + t14 + t15 + t16 + t17 + t18 + t19 + t20 + t21 + t22 +
t23
+ }
+ // scalastyle:on argcount
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]