This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 07cbba64e3ea [SPARK-48706][PYTHON] Python UDF in higher order
functions should not throw internal error
07cbba64e3ea is described below
commit 07cbba64e3ea82c169bfaa02b3a92e91207919b1
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Wed Jun 26 13:39:53 2024 +0800
[SPARK-48706][PYTHON] Python UDF in higher order functions should not throw
internal error
### What changes were proposed in this pull request?
This PR fixes the error messages and classes when Python UDFs are used in
higher order functions.
### Why are the changes needed?
To show the proper user-facing exceptions with error classes.
### Does this PR introduce _any_ user-facing change?
Yes, previously it threw internal error such as:
```python
from pyspark.sql.functions import transform, udf, col, array
spark.range(1).select(transform(array("id"), lambda x: udf(lambda y:
y)(x))).collect()
```
Before:
```
py4j.protocol.Py4JJavaError: An error occurred while calling
o74.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task
15 in stage 0.0 failed 1 times, most recent failure: Lost task 15.0 in stage
0.0 (TID 15) (ip-192-168-123-103.ap-northeast-2.compute.internal executor
driver): org.apache.spark.SparkException: [INTERNAL_ERROR] Cannot evaluate
expression: <lambda>(lambda x_0#3L)#2 SQLSTATE: XX000
at
org.apache.spark.SparkException$.internalError(SparkException.scala:92)
at
org.apache.spark.SparkException$.internalError(SparkException.scala:96)
```
After:
```
pyspark.errors.exceptions.captured.AnalysisException:
[INVALID_LAMBDA_FUNCTION_CALL.UNEVALUABLE] Invalid lambda function call. Python
UDFs should be used in a lambda function at a higher order function. However,
"<lambda>(lambda x_0#3L)" was a Python UDF. SQLSTATE: 42K0D;
Project [transform(array(id#0L), lambdafunction(<lambda>(lambda x_0#3L)#2,
lambda x_0#3L, false)) AS transform(array(id), lambdafunction(<lambda>(lambda
x_0#3L), namedlambdavariable()))#4]
+- Range (0, 1, step=1, splits=Some(16))
```
### How was this patch tested?
Unittest was added
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #47079 from HyukjinKwon/SPARK-48706.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
.../utils/src/main/resources/error/error-conditions.json | 5 +++++
.../spark/sql/catalyst/analysis/CheckAnalysis.scala | 8 ++++++++
.../spark/sql/execution/python/PythonUDFSuite.scala | 16 ++++++++++++++--
3 files changed, 27 insertions(+), 2 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index bf251f057af5..72f358f87d62 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -4482,6 +4482,11 @@
"INSERT INTO <tableName> with IF NOT EXISTS in the PARTITION spec."
]
},
+ "LAMBDA_FUNCTION_WITH_PYTHON_UDF" : {
+ "message" : [
+ "Lambda function with Python UDF <funcName> in a higher order
function."
+ ]
+ },
"LATERAL_COLUMN_ALIAS_IN_AGGREGATE_FUNC" : {
"message" : [
"Referencing a lateral column alias <lca> in the aggregate function
<aggFunc>."
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index bd8f8fe9f652..9f3eee5198a1 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -254,6 +254,14 @@ trait CheckAnalysis extends PredicateHelper with
LookupCatalog with QueryErrorsB
hof.invalidFormat(checkRes)
}
+ case hof: HigherOrderFunction
+ if hof.resolved && hof.functions
+ .exists(_.exists(_.isInstanceOf[PythonUDF])) =>
+ val u =
hof.functions.flatMap(_.find(_.isInstanceOf[PythonUDF])).head
+ hof.failAnalysis(
+ errorClass =
"UNSUPPORTED_FEATURE.LAMBDA_FUNCTION_WITH_PYTHON_UDF",
+ messageParameters = Map("funcName" -> toSQLExpr(u)))
+
// If an attribute can't be resolved as a map key of string type,
either the key should be
// surrounded with single quotes, or there is a typo in the
attribute name.
case GetMapValue(map, key: Attribute) if isMapWithStringKey(map) &&
!key.resolved =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala
index 3101281251b1..2e56ad0ab416 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala
@@ -17,8 +17,8 @@
package org.apache.spark.sql.execution.python
-import org.apache.spark.sql.{IntegratedUDFTestUtils, QueryTest}
-import org.apache.spark.sql.functions.count
+import org.apache.spark.sql.{AnalysisException, IntegratedUDFTestUtils,
QueryTest}
+import org.apache.spark.sql.functions.{array, count, transform}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.LongType
@@ -112,4 +112,16 @@ class PythonUDFSuite extends QueryTest with
SharedSparkSession {
val pandasTestUDF = TestGroupedAggPandasUDF(name = udfName)
assert(df.agg(pandasTestUDF(df("id"))).schema.fieldNames.exists(_.startsWith(udfName)))
}
+
+ test("SPARK-48706: Negative test case for Python UDF in higher order
functions") {
+ assume(shouldTestPythonUDFs)
+ checkError(
+ exception = intercept[AnalysisException] {
+ spark.range(1).select(transform(array("id"), x =>
pythonTestUDF(x))).collect()
+ },
+ errorClass = "UNSUPPORTED_FEATURE.LAMBDA_FUNCTION_WITH_PYTHON_UDF",
+ parameters = Map("funcName" -> "\"pyUDF(namedlambdavariable())\""),
+ context = ExpectedContext(
+ "transform", s".*${this.getClass.getSimpleName}.*"))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]