This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ba5af225f97 [SPARK-38108][SQL] Use error classes in the compilation 
errors of UDF/UDAF
ba5af225f97 is described below

commit ba5af225f97da147ea72d0f614165fb28e7fc568
Author: Tengfei Huang <tengfe...@gmail.com>
AuthorDate: Sun Apr 10 14:34:04 2022 +0300

    [SPARK-38108][SQL] Use error classes in the compilation errors of UDF/UDAF
    
    ### What changes were proposed in this pull request?
    Migrate the following UDF/UDAF compilation errors in QueryCompilationErrors:
    - noHandlerForUDAFError -> NO_HANDLER_FOR_UDAF
    - unexpectedEvalTypesForUDFsError -> IllegalStateException
    - usingUntypedScalaUDFError -> UNTYPED_SCALA_UDF
    - udfClassDoesNotImplementAnyUDFInterfaceError -> NO_UDF_INTERFACE_ERROR
    - udfClassNotAllowedToImplementMultiUDFInterfacesError -> 
MULTI_UDF_INTERFACE_ERROR
    - udfClassWithTooManyTypeArgumentsError -> 
UDF_WITH_TOO_MANY_TYPE_ARGUMENT_ERROR
    
    ### Why are the changes needed?
    Porting udf/udaf compilation errors to new error framework.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT added.
    
    Closes #36064 from ivoson/SPARK-38108.
    
    Lead-authored-by: Tengfei Huang <tengfe...@gmail.com>
    Co-authored-by: Huang Tengfei <tengfe...@gmail.com>
    Co-authored-by: Maxim Gekk <max.g...@gmail.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 core/src/main/resources/error/error-classes.json   |  12 +++
 .../catalog/InvalidUDFClassException.scala         |  10 +-
 .../spark/sql/errors/QueryCompilationErrors.scala  |  38 +++----
 .../org/apache/spark/sql/UDFRegistration.scala     |   2 +-
 .../sql/execution/python/ExtractPythonUDFs.scala   |   5 +-
 .../org/apache/spark/sql/api/java/UDF23Test.java}  |  19 ++--
 .../test/scala/org/apache/spark/sql/UDFSuite.scala |   5 -
 .../sql/errors/QueryCompilationErrorsSuite.scala   | 114 ++++++++++++++++++++-
 8 files changed, 165 insertions(+), 40 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index 855d3c5cd6e..fddfb004e1e 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -134,6 +134,9 @@
     "message" : [ "Unknown static partition column: %s" ],
     "sqlState" : "42000"
   },
+  "MULTI_UDF_INTERFACE_ERROR" : {
+    "message" : [ "Not allowed to implement multiple UDF interfaces, UDF class 
%s" ]
+  },
   "NON_LITERAL_PIVOT_VALUES" : {
     "message" : [ "Literal expressions required for pivot values, found '%s'" 
],
     "sqlState" : "42000"
@@ -142,6 +145,12 @@
     "message" : [ "PARTITION clause cannot contain a non-partition column 
name: %s" ],
     "sqlState" : "42000"
   },
+  "NO_HANDLER_FOR_UDAF" : {
+    "message" : [ "No handler for UDAF '%s'. Use 
sparkSession.udf.register(...) instead." ]
+  },
+  "NO_UDF_INTERFACE_ERROR" : {
+    "message" : [ "UDF class %s doesn't implement any UDF interface" ]
+  },
   "PARSE_CHAR_MISSING_LENGTH" : {
     "message" : [ "DataType %s requires a length parameter, for example 
%s(10). Please specify the length." ],
     "sqlState" : "42000"
@@ -187,6 +196,9 @@
   "UNSUPPORTED_OPERATION" : {
     "message" : [ "The operation is not supported: %s" ]
   },
+  "UNTYPED_SCALA_UDF" : {
+    "message" : [ "You're using untyped Scala UDF, which does not have the 
input type information. Spark may blindly pass null to the Scala closure with 
primitive-type argument, and the closure will see the default value of the Java 
type for the null argument, e.g. `udf((x: Int) => x, IntegerType)`, the result 
is 0 for null input. To get rid of this error, you could:\n1. use typed Scala 
UDF APIs(without return type parameter), e.g. `udf((x: Int) => x)`\n2. use Java 
UDF APIs, e.g. `udf(ne [...]
+  },
   "WRITING_JOB_ABORTED" : {
     "message" : [ "Writing job aborted" ],
     "sqlState" : "40000"
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InvalidUDFClassException.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InvalidUDFClassException.scala
index bc02efd5113..0e5e52a9c90 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InvalidUDFClassException.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InvalidUDFClassException.scala
@@ -17,12 +17,18 @@
 
 package org.apache.spark.sql.catalyst.catalog
 
+import org.apache.spark.SparkThrowableHelper
 import org.apache.spark.sql.AnalysisException
 
 /**
  * Thrown when a query failed for invalid function class, usually because a SQL
  * function's class does not follow the rules of the UDF/UDAF/UDTF class 
definition.
  */
-class InvalidUDFClassException private[sql](message: String)
-  extends AnalysisException(message, None, None, None, None) {
+class InvalidUDFClassException private[sql](
+    message: String,
+    errorClass: Option[String] = None)
+  extends AnalysisException(message = message, errorClass = errorClass) {
+
+  def this(errorClass: String, messageParameters: Array[String]) =
+    this(SparkThrowableHelper.getMessage(errorClass, messageParameters), 
Some(errorClass))
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 57ed7da7b20..b6b64804904 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -764,8 +764,9 @@ object QueryCompilationErrors {
   }
 
   def noHandlerForUDAFError(name: String): Throwable = {
-    new InvalidUDFClassException(s"No handler for UDAF '$name'. " +
-      "Use sparkSession.udf.register(...) instead.")
+    new InvalidUDFClassException(
+      errorClass = "NO_HANDLER_FOR_UDAF",
+      messageParameters = Array(name))
   }
 
   def batchWriteCapabilityError(
@@ -1357,12 +1358,6 @@ object QueryCompilationErrors {
        """.stripMargin.replaceAll("\n", " "))
   }
 
-  def unexpectedEvalTypesForUDFsError(evalTypes: Set[Int]): Throwable = {
-    new AnalysisException(
-      s"Expected udfs have the same evalType but got different evalTypes: " +
-        s"${evalTypes.mkString(",")}")
-  }
-
   def ambiguousFieldNameError(
       fieldName: Seq[String], numMatches: Int, context: Origin): Throwable = {
     new AnalysisException(
@@ -2264,17 +2259,9 @@ object QueryCompilationErrors {
   }
 
   def usingUntypedScalaUDFError(): Throwable = {
-    new AnalysisException("You're using untyped Scala UDF, which does not have 
the input type " +
-      "information. Spark may blindly pass null to the Scala closure with 
primitive-type " +
-      "argument, and the closure will see the default value of the Java type 
for the null " +
-      "argument, e.g. `udf((x: Int) => x, IntegerType)`, the result is 0 for 
null input. " +
-      "To get rid of this error, you could:\n" +
-      "1. use typed Scala UDF APIs(without return type parameter), e.g. 
`udf((x: Int) => x)`\n" +
-      "2. use Java UDF APIs, e.g. `udf(new UDF1[String, Integer] { " +
-      "override def call(s: String): Integer = s.length() }, IntegerType)`, " +
-      "if input types are all non primitive\n" +
-      s"3. set ${SQLConf.LEGACY_ALLOW_UNTYPED_SCALA_UDF.key} to true and " +
-      s"use this API with caution")
+    new AnalysisException(
+      errorClass = "UNTYPED_SCALA_UDF",
+      messageParameters = Array.empty)
   }
 
   def aggregationFunctionAppliedOnNonNumericColumnError(colName: String): 
Throwable = {
@@ -2310,16 +2297,21 @@ object QueryCompilationErrors {
   }
 
   def udfClassDoesNotImplementAnyUDFInterfaceError(className: String): 
Throwable = {
-    new AnalysisException(s"UDF class $className doesn't implement any UDF 
interface")
+    new AnalysisException(
+      errorClass = "NO_UDF_INTERFACE_ERROR",
+      messageParameters = Array(className))
   }
 
-  def udfClassNotAllowedToImplementMultiUDFInterfacesError(className: String): 
Throwable = {
+  def udfClassImplementMultiUDFInterfacesError(className: String): Throwable = 
{
     new AnalysisException(
-      s"It is invalid to implement multiple UDF interfaces, UDF class 
$className")
+      errorClass = "MULTI_UDF_INTERFACE_ERROR",
+      messageParameters = Array(className))
   }
 
   def udfClassWithTooManyTypeArgumentsError(n: Int): Throwable = {
-    new AnalysisException(s"UDF class with $n type arguments is not 
supported.")
+    new AnalysisException(
+      errorClass = "UNSUPPORTED_FEATURE",
+      messageParameters = Array(s"UDF class with $n type arguments"))
   }
 
   def classWithoutPublicNonArgumentConstructorError(className: String): 
Throwable = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
index 366aa72a409..fb7323a48cc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala
@@ -661,7 +661,7 @@ class UDFRegistration private[sql] (functionRegistry: 
FunctionRegistry) extends
       if (udfInterfaces.length == 0) {
         throw 
QueryCompilationErrors.udfClassDoesNotImplementAnyUDFInterfaceError(className)
       } else if (udfInterfaces.length > 1) {
-        throw 
QueryCompilationErrors.udfClassNotAllowedToImplementMultiUDFInterfacesError(className)
+        throw 
QueryCompilationErrors.udfClassImplementMultiUDFInterfacesError(className)
       } else {
         try {
           val udf = clazz.getConstructor().newInstance()
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
index a809ea07d0e..fb90187a8b1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
@@ -26,7 +26,6 @@ import 
org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreePattern._
-import org.apache.spark.sql.errors.QueryCompilationErrors
 
 
 /**
@@ -264,7 +263,9 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with 
PredicateHelper {
 
           val evalTypes = validUdfs.map(_.evalType).toSet
           if (evalTypes.size != 1) {
-            throw 
QueryCompilationErrors.unexpectedEvalTypesForUDFsError(evalTypes)
+            throw new IllegalStateException(
+              "Expected udfs have the same evalType but got different 
evalTypes: " +
+              evalTypes.mkString(","))
           }
           val evalType = evalTypes.head
           val evaluation = evalType match {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InvalidUDFClassException.scala
 b/sql/core/src/test/java/org/apache/spark/sql/api/java/UDF23Test.java
similarity index 58%
copy from 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InvalidUDFClassException.scala
copy to sql/core/src/test/java/org/apache/spark/sql/api/java/UDF23Test.java
index bc02efd5113..545af5987b9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InvalidUDFClassException.scala
+++ b/sql/core/src/test/java/org/apache/spark/sql/api/java/UDF23Test.java
@@ -15,14 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.catalyst.catalog
+package org.apache.spark.sql.api.java;
 
-import org.apache.spark.sql.AnalysisException
+import java.io.Serializable;
+
+import org.apache.spark.annotation.Stable;
 
 /**
- * Thrown when a query failed for invalid function class, usually because a SQL
- * function's class does not follow the rules of the UDF/UDAF/UDTF class 
definition.
+ * A Spark SQL UDF that has 23 arguments for test.
  */
-class InvalidUDFClassException private[sql](message: String)
-  extends AnalysisException(message, None, None, None, None) {
+@Stable
+public interface UDF23Test<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12,
+    T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, R> extends 
Serializable {
+
+  R call(
+      T1 t1, T2 t2, T3 t3, T4 t4, T5 t5, T6 t6, T7 t7, T8 t8, T9 t9, T10 t10,
+      T11 t11, T12 t12, T13 t13, T14 t14, T15 t15, T16 t16, T17 t17, T18 t18,
+      T19 t19, T20 t20, T21 t21, T22 t22, T23 t23) throws Exception;
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
index 1393eba40b7..8fd7aa3e0cd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -471,11 +471,6 @@ class UDFSuite extends QueryTest with SharedSparkSession {
 
   }
 
-  test("use untyped Scala UDF should fail by default") {
-    val e = intercept[AnalysisException](udf((x: Int) => x, IntegerType))
-    assert(e.getMessage.contains("You're using untyped Scala UDF"))
-  }
-
   test("SPARK-26308: udf with decimal") {
     val df1 = spark.createDataFrame(
       sparkContext.parallelize(Seq(Row(new 
BigDecimal("2011000000000002456556")))),
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
index 4d776caacf3..e9a86df7f07 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/errors/QueryCompilationErrorsSuite.scala
@@ -18,9 +18,12 @@
 package org.apache.spark.sql.errors
 
 import org.apache.spark.sql.{AnalysisException, IntegratedUDFTestUtils, 
QueryTest}
-import org.apache.spark.sql.functions.{grouping, grouping_id, sum}
+import org.apache.spark.sql.api.java.{UDF1, UDF2, UDF23Test}
+import org.apache.spark.sql.expressions.SparkUserDefinedFunction
+import org.apache.spark.sql.functions.{grouping, grouping_id, sum, udf}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, StringType}
 
 case class StringLongClass(a: String, b: Long)
 
@@ -171,4 +174,113 @@ class QueryCompilationErrorsSuite extends QueryTest with 
SharedSparkSession {
       "The feature is not supported: " +
       "Pandas UDF aggregate expressions don't support pivot.")
   }
+
+  test("NO_HANDLER_FOR_UDAF: No handler for UDAF error") {
+    val functionName = "myCast"
+    withUserDefinedFunction(functionName -> true) {
+      sql(
+        s"""
+          |CREATE TEMPORARY FUNCTION $functionName
+          |AS 'org.apache.spark.sql.errors.MyCastToString'
+          |""".stripMargin)
+
+      val e = intercept[AnalysisException] (
+        sql(s"SELECT $functionName(123) as value")
+      )
+
+      assert(e.errorClass === Some("NO_HANDLER_FOR_UDAF"))
+      assert(e.message ===
+        "No handler for UDAF 'org.apache.spark.sql.errors.MyCastToString'. " +
+        "Use sparkSession.udf.register(...) instead.")
+    }
+  }
+
+  test("UNTYPED_SCALA_UDF: use untyped Scala UDF should fail by default") {
+    val e = intercept[AnalysisException](udf((x: Int) => x, IntegerType))
+
+    assert(e.errorClass === Some("UNTYPED_SCALA_UDF"))
+    assert(e.message ===
+      "You're using untyped Scala UDF, which does not have the input type " +
+      "information. Spark may blindly pass null to the Scala closure with 
primitive-type " +
+      "argument, and the closure will see the default value of the Java type 
for the null " +
+      "argument, e.g. `udf((x: Int) => x, IntegerType)`, the result is 0 for 
null input. " +
+      "To get rid of this error, you could:\n" +
+      "1. use typed Scala UDF APIs(without return type parameter), e.g. 
`udf((x: Int) => x)`\n" +
+      "2. use Java UDF APIs, e.g. `udf(new UDF1[String, Integer] { " +
+      "override def call(s: String): Integer = s.length() }, IntegerType)`, " +
+      "if input types are all non primitive\n" +
+      s"3. set ${SQLConf.LEGACY_ALLOW_UNTYPED_SCALA_UDF.key} to true and " +
+      s"use this API with caution")
+  }
+
+  test("NO_UDF_INTERFACE_ERROR: java udf class does not implement any udf 
interface") {
+    val className = "org.apache.spark.sql.errors.MyCastToString"
+    val e = intercept[AnalysisException](
+      spark.udf.registerJava(
+        "myCast",
+        className,
+        StringType)
+    )
+
+    assert(e.errorClass === Some("NO_UDF_INTERFACE_ERROR"))
+    assert(e.message ===
+      s"UDF class $className doesn't implement any UDF interface")
+  }
+
+  test("MULTI_UDF_INTERFACE_ERROR: java udf implement multi UDF interface") {
+    val className = "org.apache.spark.sql.errors.MySum"
+    val e = intercept[AnalysisException](
+      spark.udf.registerJava(
+        "mySum",
+        className,
+        StringType)
+    )
+
+    assert(e.errorClass === Some("MULTI_UDF_INTERFACE_ERROR"))
+    assert(e.message ===
+      s"Not allowed to implement multiple UDF interfaces, UDF class 
$className")
+  }
+
+  test("UNSUPPORTED_FEATURE: java udf with too many type arguments") {
+    val className = "org.apache.spark.sql.errors.MultiIntSum"
+    val e = intercept[AnalysisException](
+      spark.udf.registerJava(
+        "mySum",
+        className,
+        StringType)
+    )
+
+    assert(e.errorClass === Some("UNSUPPORTED_FEATURE"))
+    assert(e.getSqlState === "0A000")
+    assert(e.message === "The feature is not supported: UDF class with 24 type 
arguments")
+  }
+}
+
+class MyCastToString extends SparkUserDefinedFunction(
+  (input: Any) => if (input == null) {
+    null
+  } else {
+    input.toString
+  },
+  StringType,
+  inputEncoders = Seq.fill(1)(None))
+
+class MySum extends UDF1[Int, Int] with UDF2[Int, Int, Int] {
+  override def call(t1: Int): Int = t1
+
+  override def call(t1: Int, t2: Int): Int = t1 + t2
+}
+
+class MultiIntSum extends
+  UDF23Test[Int, Int, Int, Int, Int, Int, Int, Int, Int, Int, Int, Int,
+    Int, Int, Int, Int, Int, Int, Int, Int, Int, Int, Int, Int] {
+  // scalastyle:off argcount
+  override def call(
+      t1: Int, t2: Int, t3: Int, t4: Int, t5: Int, t6: Int, t7: Int, t8: Int,
+      t9: Int, t10: Int, t11: Int, t12: Int, t13: Int, t14: Int, t15: Int, 
t16: Int,
+      t17: Int, t18: Int, t19: Int, t20: Int, t21: Int, t22: Int, t23: Int): 
Int = {
+    t1 + t2 + t3 + t4 + t5 + t6 + t7 + t8 + t9 + t10 +
+      t11 + t12 + t13 + t14 + t15 + t16 + t17 + t18 + t19 + t20 + t21 + t22 + 
t23
+  }
+  // scalastyle:on argcount
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to