This is an automated email from the ASF dual-hosted git repository.
ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 31f85e5ff77c [SPARK-46966][PYTHON] Add UDTF API for 'analyze' method
to indicate subset of input table columns to select
31f85e5ff77c is described below
commit 31f85e5ff77c9f9b704160c3e70849a488a4f40d
Author: Daniel Tenedorio <[email protected]>
AuthorDate: Wed Feb 7 13:42:44 2024 -0800
[SPARK-46966][PYTHON] Add UDTF API for 'analyze' method to indicate subset
of input table columns to select
### What changes were proposed in this pull request?
This PR adds a UDTF API for the 'analyze' method to indicate subset of
input table columns to select.
For example, this UDTF populates this 'select' list to indicate that Spark
should only return two input columns from the input table: 'input' and
'partition_col':
```
from pyspark.sql.functions import AnalyzeResult, OrderingColumn,
PartitioningColumn, SelectedColumn
from pyspark.sql.types import IntegerType, Row, StructType
class Udtf:
def __init__(self):
self._partition_col = None
self._count = 0
self._sum = 0
self._last = None
staticmethod
def analyze(row: Row):
return AnalyzeResult(
schema=StructType()
.add("user_id", IntegerType())
.add("count", IntegerType())
.add("total", IntegerType())
.add("last", IntegerType()),
partitionBy=[
PartitioningColumn("user_id")
],
orderBy=[
OrderingColumn("timestamp")
],
select=[
SelectedColumn("input"),
SelectedColumn("partition_col")
])
def eval(self, row: Row):
self._partition_col = row["partition_col"]
self._count += 1
self._last = row["input"]
self._sum += row["input"]
def terminate(self):
yield self._partition_col, self._count, self._sum, self._last
```
### Why are the changes needed?
This can reduce the amount of data sent between the JVM and Python
interpreter, improving performance.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
This PR adds test coverage.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #45007 from dtenedor/udtf-select-cols.
Authored-by: Daniel Tenedorio <[email protected]>
Signed-off-by: Takuya UESHIN <[email protected]>
---
.../src/main/resources/error/error-classes.json | 6 ++
docs/sql-error-conditions.md | 6 ++
python/pyspark/sql/functions/builtin.py | 2 +-
python/pyspark/sql/udtf.py | 32 ++++++-
python/pyspark/sql/worker/analyze_udtf.py | 20 ++++-
.../FunctionTableSubqueryArgumentExpression.scala | 21 ++++-
.../spark/sql/catalyst/expressions/PythonUDF.scala | 27 +++++-
.../spark/sql/errors/QueryCompilationErrors.scala | 6 ++
.../python/UserDefinedPythonFunction.scala | 15 +++-
.../sql-tests/analyzer-results/udtf/udtf.sql.out | 79 +++++++++++++++++
.../test/resources/sql-tests/inputs/udtf/udtf.sql | 6 ++
.../resources/sql-tests/results/udtf/udtf.sql.out | 98 ++++++++++++++++++++++
.../apache/spark/sql/IntegratedUDFTestUtils.scala | 62 ++++++++++++--
13 files changed, 367 insertions(+), 13 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-classes.json
b/common/utils/src/main/resources/error/error-classes.json
index e9dc715cf328..4fcf9248d3e2 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -3456,6 +3456,12 @@
],
"sqlState" : "42802"
},
+
"UDTF_INVALID_REQUESTED_SELECTED_EXPRESSION_FROM_ANALYZE_METHOD_REQUIRES_ALIAS"
: {
+ "message" : [
+ "Failed to evaluate the user-defined table function because its
'analyze' method returned a requested 'select' expression (<expression>) that
does not include a corresponding alias; please update the UDTF to specify an
alias there and then try the query again."
+ ],
+ "sqlState" : "42802"
+ },
"UNABLE_TO_ACQUIRE_MEMORY" : {
"message" : [
"Unable to acquire <requestedBytes> bytes of memory, got
<receivedBytes>."
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 7c51a6bcc145..f03a98e03d24 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -2215,6 +2215,12 @@ Please ensure that the number of aliases provided
matches the number of columns
Failed to evaluate the user-defined table function because its 'analyze'
method returned a requested OrderingColumn whose column name expression
included an unnecessary alias `<aliasName>`; please remove this alias and then
try the query again.
+###
UDTF_INVALID_REQUESTED_SELECTED_EXPRESSION_FROM_ANALYZE_METHOD_REQUIRES_ALIAS
+
+[SQLSTATE:
42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Failed to evaluate the user-defined table function because its 'analyze'
method returned a requested 'select' expression (`<expression>`) that does not
include a corresponding alias; please update the UDTF to specify an alias there
and then try the query again.
+
### UNABLE_TO_ACQUIRE_MEMORY
[SQLSTATE:
53200](sql-error-conditions-sqlstates.html#class-53-insufficient-resources)
diff --git a/python/pyspark/sql/functions/builtin.py
b/python/pyspark/sql/functions/builtin.py
index cb872fdb8180..110006df4317 100644
--- a/python/pyspark/sql/functions/builtin.py
+++ b/python/pyspark/sql/functions/builtin.py
@@ -50,7 +50,7 @@ from pyspark.sql.types import ArrayType, DataType,
StringType, StructType, _from
# Keep UserDefinedFunction import for backwards compatible import; moved in
SPARK-22409
from pyspark.sql.udf import UserDefinedFunction, _create_py_udf # noqa: F401
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult # noqa: F401
-from pyspark.sql.udtf import OrderingColumn, PartitioningColumn # noqa: F401
+from pyspark.sql.udtf import OrderingColumn, PartitioningColumn,
SelectedColumn # noqa: F401
from pyspark.sql.udtf import SkipRestOfInputTableException # noqa: F401
from pyspark.sql.udtf import UserDefinedTableFunction, _create_py_udtf
diff --git a/python/pyspark/sql/udtf.py b/python/pyspark/sql/udtf.py
index fe3b0dc54314..c4ded5af71e4 100644
--- a/python/pyspark/sql/udtf.py
+++ b/python/pyspark/sql/udtf.py
@@ -43,6 +43,7 @@ __all__ = [
"AnalyzeResult",
"PartitioningColumn",
"OrderingColumn",
+ "SelectedColumn",
"SkipRestOfInputTableException",
"UDTFRegistration",
]
@@ -108,6 +109,26 @@ class OrderingColumn:
overrideNullsFirst: Optional[bool] = None
+@dataclass(frozen=True)
+class SelectedColumn:
+ """
+ Represents an expression that the UDTF is specifying for Catalyst to
evaluate against the
+ columns in the input TABLE argument. The UDTF then receives one input
column for each expression
+ in the list, in the order they are listed.
+
+ Parameters
+ ----------
+ name : str
+ The contents of the selected column name or expression represented as
a SQL string.
+ alias : str, default ''
+ If non-empty, this is the alias for the column or expression as
visible from the UDTF's
+ 'eval' method. This is required if the expression is not a simple
column reference.
+ """
+
+ name: str
+ alias: str = ""
+
+
# Note: this class is a "dataclass" for purposes of convenience, but it is not
marked "frozen"
# because the intention is that users may create subclasses of it for purposes
of returning custom
# information from the "analyze" method.
@@ -118,13 +139,13 @@ class AnalyzeResult:
Parameters
----------
- schema : :class:`StructType`
+ schema: :class:`StructType`
The schema that the Python UDTF will return.
- withSinglePartition : bool
+ withSinglePartition: bool
If true, the UDTF is specifying for Catalyst to repartition all rows
of the input TABLE
argument to one collection for consumption by exactly one instance of
the correpsonding
UDTF class.
- partitionBy : sequence of :class:`PartitioningColumn`
+ partitionBy: sequence of :class:`PartitioningColumn`
If non-empty, this is a sequence of expressions that the UDTF is
specifying for Catalyst to
partition the input TABLE argument by. In this case, calls to the UDTF
may not include any
explicit PARTITION BY clause, in which case Catalyst will return an
error. This option is
@@ -133,12 +154,17 @@ class AnalyzeResult:
If non-empty, this is a sequence of expressions that the UDTF is
specifying for Catalyst to
sort the input TABLE argument by. Note that the 'partitionBy' list
must also be non-empty
in this case.
+ select: sequence of :class:`SelectedColumn`
+ If non-empty, this is a sequence of expressions that the UDTF is
specifying for Catalyst to
+ evaluate against the columns in the input TABLE argument. The UDTF
then receives one input
+ attribute for each name in the list, in the order they are listed.
"""
schema: StructType
withSinglePartition: bool = False
partitionBy: Sequence[PartitioningColumn] = field(default_factory=tuple)
orderBy: Sequence[OrderingColumn] = field(default_factory=tuple)
+ select: Sequence[SelectedColumn] = field(default_factory=tuple)
class SkipRestOfInputTableException(Exception):
diff --git a/python/pyspark/sql/worker/analyze_udtf.py
b/python/pyspark/sql/worker/analyze_udtf.py
index ce21e4859770..f61330b806cd 100644
--- a/python/pyspark/sql/worker/analyze_udtf.py
+++ b/python/pyspark/sql/worker/analyze_udtf.py
@@ -31,7 +31,7 @@ from pyspark.serializers import (
write_with_length,
SpecialLengths,
)
-from pyspark.sql.functions import PartitioningColumn
+from pyspark.sql.functions import PartitioningColumn, SelectedColumn
from pyspark.sql.types import _parse_datatype_json_string, StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult
from pyspark.util import handle_worker_exception
@@ -203,6 +203,19 @@ def main(infile: IO, outfile: IO) -> None:
and then try the query again."""
)
)
+ elif isinstance(result.select, (list, tuple)) and (
+ len(result.select) > 0
+ and not all([isinstance(val, SelectedColumn) for val in
result.select])
+ ):
+ raise PySparkValueError(
+ format_error(
+ f"""
+ {error_prefix} because the static 'analyze' method
returned an
+ 'AnalyzeResult' object with the 'select' field set to a
value besides a
+ list or tuple of 'SelectedColumn' objects. Please update
the table function
+ and then try the query again."""
+ )
+ )
# Return the analyzed schema.
write_with_length(result.schema.json().encode("utf-8"), outfile)
@@ -225,6 +238,11 @@ def main(infile: IO, outfile: IO) -> None:
write_int(1, outfile)
else:
write_int(2, outfile)
+ # Return the requested selected input table columns, if specified.
+ write_int(len(result.select), outfile)
+ for col in result.select:
+ write_with_length(col.name.encode("utf-8"), outfile)
+ write_with_length(col.alias.encode("utf-8"), outfile)
except BaseException as e:
handle_worker_exception(e, outfile)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala
index da2dd9b1e256..3c60e1dfb2c3 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, LogicalPlan,
Project, Repartition, RepartitionByExpression, Sort}
import
org.apache.spark.sql.catalyst.trees.TreePattern.{FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION,
TreePattern}
+import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.DataType
/**
@@ -58,6 +59,10 @@ import org.apache.spark.sql.types.DataType
* @param orderByExpressions if non-empty, the TABLE argument included the
ORDER BY clause to
* indicate that the rows within each partition of
the table function are
* to arrive in the provided order.
+ * @param selectedInputExpressions If non-empty, this is a sequence of
expressions that the UDTF is
+ * specifying for Catalyst to evaluate against
the columns in the
+ * input TABLE argument. The UDTF then
receives one input attribute
+ * for each name in the list, in the order
they are listed.
*/
case class FunctionTableSubqueryArgumentExpression(
plan: LogicalPlan,
@@ -65,7 +70,8 @@ case class FunctionTableSubqueryArgumentExpression(
exprId: ExprId = NamedExpression.newExprId,
partitionByExpressions: Seq[Expression] = Seq.empty,
withSinglePartition: Boolean = false,
- orderByExpressions: Seq[SortOrder] = Seq.empty)
+ orderByExpressions: Seq[SortOrder] = Seq.empty,
+ selectedInputExpressions: Seq[PythonUDTFSelectedExpression] = Seq.empty)
extends SubqueryExpression(plan, outerAttrs, exprId, Seq.empty, None) with
Unevaluable {
assert(!(withSinglePartition && partitionByExpressions.nonEmpty),
@@ -134,6 +140,19 @@ case class FunctionTableSubqueryArgumentExpression(
child = subquery)
}
}
+ // If instructed, add a projection to compute the specified input
expressions.
+ if (selectedInputExpressions.nonEmpty) {
+ val projectList: Seq[NamedExpression] = selectedInputExpressions.map {
+ case PythonUDTFSelectedExpression(expression: Expression, Some(alias:
String)) =>
+ Alias(expression, alias)()
+ case PythonUDTFSelectedExpression(a: Attribute, None) =>
+ a
+ case PythonUDTFSelectedExpression(other: Expression, None) =>
+ throw QueryCompilationErrors
+ .invalidUDTFSelectExpressionFromAnalyzeMethodNeedsAlias(other.sql)
+ } ++ extraProjectedPartitioningExpressions
+ subquery = Project(projectList, subquery)
+ }
Project(Seq(Alias(CreateStruct(subquery.output), "c")()), subquery)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
index f886b50e8a23..b38d161be383 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
@@ -246,6 +246,12 @@ case class UnresolvedPolymorphicPythonUDTF(
* @param orderByExpressions if non-empty, this contains the list of ordering
items that the
* 'analyze' method explicitly indicated that the
UDTF call should consume
* the input table rows by
+ * @param selectedInputExpressions If non-empty, this is a list of expressions
that the UDTF is
+ * specifying for Catalyst to evaluate against
the columns in the
+ * input TABLE argument. In this case,
Catalyst will insert a
+ * projection to evaluate these expressions
and return the result to
+ * the UDTF. The UDTF then receives one input
column for each
+ * expression in the list, in the order they
are listed.
* @param pickledAnalyzeResult this is the pickled 'AnalyzeResult' instance
from the UDTF, which
* contains all metadata returned by the Python
UDTF 'analyze' method
* including the result schema of the function
call as well as optional
@@ -256,6 +262,7 @@ case class PythonUDTFAnalyzeResult(
withSinglePartition: Boolean,
partitionByExpressions: Seq[Expression],
orderByExpressions: Seq[SortOrder],
+ selectedInputExpressions: Seq[PythonUDTFSelectedExpression],
pickledAnalyzeResult: Array[Byte]) {
/**
* Applies the requested properties from this analysis result to the target
TABLE argument
@@ -291,6 +298,7 @@ case class PythonUDTFAnalyzeResult(
var newWithSinglePartition = t.withSinglePartition
var newPartitionByExpressions = t.partitionByExpressions
var newOrderByExpressions = t.orderByExpressions
+ var newSelectedInputExpressions = t.selectedInputExpressions
if (withSinglePartition) {
newWithSinglePartition = true
}
@@ -300,13 +308,30 @@ case class PythonUDTFAnalyzeResult(
if (orderByExpressions.nonEmpty) {
newOrderByExpressions = orderByExpressions
}
+ if (selectedInputExpressions.nonEmpty) {
+ newSelectedInputExpressions = selectedInputExpressions
+ }
t.copy(
withSinglePartition = newWithSinglePartition,
partitionByExpressions = newPartitionByExpressions,
- orderByExpressions = newOrderByExpressions)
+ orderByExpressions = newOrderByExpressions,
+ selectedInputExpressions = newSelectedInputExpressions)
}
}
+/**
+ * Represents an expression that the UDTF is specifying for Catalyst to
evaluate against the
+ * columns in the input TABLE argument. The UDTF then receives one input
column for each expression
+ * in the list, in the order they are listed.
+ *
+ * @param expression the expression that the UDTF is specifying for Catalyst
to evaluate against the
+ * columns in the input TABLE argument
+ * @param alias If present, this is the alias for the column or expression as
visible from the
+ * UDTF's 'eval' method. This is required if the expression is
not a simple column
+ * reference.
+ */
+case class PythonUDTFSelectedExpression(expression: Expression, alias:
Option[String])
+
/**
* A place holder used when printing expressions without debugging information
such as the
* result id.
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index f0cbe727edfe..46028817e8eb 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -689,6 +689,12 @@ private[sql] object QueryCompilationErrors extends
QueryErrorsBase with Compilat
messageParameters = Map("aliasName" -> aliasName))
}
+ def invalidUDTFSelectExpressionFromAnalyzeMethodNeedsAlias(expression:
String): Throwable = {
+ new AnalysisException(
+ errorClass =
"UDTF_INVALID_REQUESTED_SELECTED_EXPRESSION_FROM_ANALYZE_METHOD_REQUIRES_ALIAS",
+ messageParameters = Map("expression" -> expression))
+ }
+
def windowAggregateFunctionWithFilterNotSupportedError(): Throwable = {
new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_1030",
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala
index 6bb1992a064a..32406460ad71 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala
@@ -25,7 +25,7 @@ import net.razorvine.pickle.Pickler
import org.apache.spark.api.python.{PythonEvalType, PythonFunction,
PythonWorkerUtils, SpecialLengths}
import org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending,
Descending, Expression, FunctionTableSubqueryArgumentExpression,
NamedArgumentExpression, NullsFirst, NullsLast, PythonUDAF, PythonUDF,
PythonUDTF, PythonUDTFAnalyzeResult, SortOrder, UnresolvedPolymorphicPythonUDTF}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending,
Descending, Expression, FunctionTableSubqueryArgumentExpression,
NamedArgumentExpression, NullsFirst, NullsLast, PythonUDAF, PythonUDF,
PythonUDTF, PythonUDTFAnalyzeResult, PythonUDTFSelectedExpression, SortOrder,
UnresolvedPolymorphicPythonUDTF}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.{Generate, LogicalPlan,
NamedParametersSupport, OneRowRelation}
import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -276,11 +276,24 @@ class UserDefinedPythonTableFunctionAnalyzeRunner(
case 2 => orderBy.append(SortOrder(parsed, direction, NullsLast,
Seq.empty))
}
}
+ // Receive the list of requested input columns to select, if specified.
+ val numSelectedInputExpressions = dataIn.readInt()
+ val selectedInputExpressions =
ArrayBuffer.empty[PythonUDTFSelectedExpression]
+ for (_ <- 0 until numSelectedInputExpressions) {
+ val expressionSql: String = PythonWorkerUtils.readUTF(dataIn)
+ val parsed: Expression = parser.parseExpression(expressionSql)
+ val alias: String = PythonWorkerUtils.readUTF(dataIn)
+ selectedInputExpressions.append(
+ PythonUDTFSelectedExpression(
+ parsed,
+ if (alias.nonEmpty) Some(alias) else None))
+ }
PythonUDTFAnalyzeResult(
schema = schema,
withSinglePartition = withSinglePartition,
partitionByExpressions = partitionByExpressions.toSeq,
orderByExpressions = orderBy.toSeq,
+ selectedInputExpressions = selectedInputExpressions.toSeq,
pickledAnalyzeResult = pickledAnalyzeResult)
}
}
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/udtf/udtf.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/udtf/udtf.sql.out
index 2eeb25633ced..8cf567c14b3b 100644
--- a/sql/core/src/test/resources/sql-tests/analyzer-results/udtf/udtf.sql.out
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/udtf/udtf.sql.out
@@ -284,6 +284,85 @@ SELECT * FROM UDTFPartitionByOrderByComplexExpr(TABLE(t2))
[Analyzer test output redacted due to nondeterminism]
+-- !query
+SELECT * FROM UDTFPartitionByOrderBySelectExpr(TABLE(t2))
+-- !query analysis
+[Analyzer test output redacted due to nondeterminism]
+
+
+-- !query
+SELECT * FROM UDTFPartitionByOrderBySelectComplexExpr(TABLE(t2))
+-- !query analysis
+[Analyzer test output redacted due to nondeterminism]
+
+
+-- !query
+SELECT * FROM UDTFPartitionByOrderBySelectExprOnlyPartitionColumn(TABLE(t2))
+-- !query analysis
+[Analyzer test output redacted due to nondeterminism]
+
+
+-- !query
+SELECT * FROM UDTFInvalidSelectExprParseError(TABLE(t2))
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+ "sqlState" : "42703",
+ "messageParameters" : {
+ "objectName" : "`unparsable`",
+ "proposal" : "`t2`.`input`, `partition_by_0`, `t2`.`partition_col`"
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 1,
+ "stopIndex" : 10,
+ "fragment" : "unparsable"
+ } ]
+}
+
+
+-- !query
+SELECT * FROM UDTFInvalidSelectExprStringValue(TABLE(t2))
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+ "errorClass" : "TABLE_VALUED_FUNCTION_FAILED_TO_ANALYZE_IN_PYTHON",
+ "sqlState" : "38000",
+ "messageParameters" : {
+ "msg" : "Failed to evaluate the user-defined table function
'UDTFInvalidSelectExprStringValue' because the static 'analyze' method returned
an 'AnalyzeResult' object with the 'select' field set to a value besides a list
or tuple of 'SelectedColumn' objects. Please update the table function and then
try the query again."
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 15,
+ "stopIndex" : 57,
+ "fragment" : "UDTFInvalidSelectExprStringValue(TABLE(t2))"
+ } ]
+}
+
+
+-- !query
+SELECT * FROM UDTFInvalidComplexSelectExprMissingAlias(TABLE(t2))
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+ "errorClass" :
"UDTF_INVALID_REQUESTED_SELECTED_EXPRESSION_FROM_ANALYZE_METHOD_REQUIRES_ALIAS",
+ "sqlState" : "42802",
+ "messageParameters" : {
+ "expression" : "(input + 1)"
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 15,
+ "stopIndex" : 65,
+ "fragment" : "UDTFInvalidComplexSelectExprMissingAlias(TABLE(t2))"
+ } ]
+}
+
+
-- !query
SELECT * FROM UDTFInvalidOrderByAscKeyword(TABLE(t2))
-- !query analysis
diff --git a/sql/core/src/test/resources/sql-tests/inputs/udtf/udtf.sql
b/sql/core/src/test/resources/sql-tests/inputs/udtf/udtf.sql
index 304ee1168e48..03d5b001d102 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/udtf/udtf.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/udtf/udtf.sql
@@ -76,6 +76,12 @@ SELECT * FROM
JOIN LATERAL
UDTFPartitionByOrderBy(TABLE(t2) PARTITION BY partition_col);
SELECT * FROM UDTFPartitionByOrderByComplexExpr(TABLE(t2));
+SELECT * FROM UDTFPartitionByOrderBySelectExpr(TABLE(t2));
+SELECT * FROM UDTFPartitionByOrderBySelectComplexExpr(TABLE(t2));
+SELECT * FROM UDTFPartitionByOrderBySelectExprOnlyPartitionColumn(TABLE(t2));
+SELECT * FROM UDTFInvalidSelectExprParseError(TABLE(t2));
+SELECT * FROM UDTFInvalidSelectExprStringValue(TABLE(t2));
+SELECT * FROM UDTFInvalidComplexSelectExprMissingAlias(TABLE(t2));
SELECT * FROM UDTFInvalidOrderByAscKeyword(TABLE(t2));
-- As a reminder, UDTFInvalidPartitionByAndWithSinglePartition returns this
analyze result:
-- AnalyzeResult(
diff --git a/sql/core/src/test/resources/sql-tests/results/udtf/udtf.sql.out
b/sql/core/src/test/resources/sql-tests/results/udtf/udtf.sql.out
index 0ff226df2c86..c85eacdc348b 100644
--- a/sql/core/src/test/resources/sql-tests/results/udtf/udtf.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udtf/udtf.sql.out
@@ -344,6 +344,104 @@ struct<partition_col:int,count:int,total:int,last:int>
1 2 5 3
+-- !query
+SELECT * FROM UDTFPartitionByOrderBySelectExpr(TABLE(t2))
+-- !query schema
+struct<partition_col:int,count:int,total:int,last:int>
+-- !query output
+0 1 1 1
+1 2 5 3
+
+
+-- !query
+SELECT * FROM UDTFPartitionByOrderBySelectComplexExpr(TABLE(t2))
+-- !query schema
+struct<partition_col:int,count:int,total:int,last:int>
+-- !query output
+0 1 2 2
+1 2 7 4
+
+
+-- !query
+SELECT * FROM UDTFPartitionByOrderBySelectExprOnlyPartitionColumn(TABLE(t2))
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.api.python.PythonException
+ValueError: 'input' is not in list
+
+During handling of the above exception, another exception occurred:
+
+pyspark.errors.exceptions.base.PySparkValueError: input
+
+
+-- !query
+SELECT * FROM UDTFInvalidSelectExprParseError(TABLE(t2))
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+ "errorClass" : "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+ "sqlState" : "42703",
+ "messageParameters" : {
+ "objectName" : "`unparsable`",
+ "proposal" : "`t2`.`input`, `partition_by_0`, `t2`.`partition_col`"
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 1,
+ "stopIndex" : 10,
+ "fragment" : "unparsable"
+ } ]
+}
+
+
+-- !query
+SELECT * FROM UDTFInvalidSelectExprStringValue(TABLE(t2))
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+ "errorClass" : "TABLE_VALUED_FUNCTION_FAILED_TO_ANALYZE_IN_PYTHON",
+ "sqlState" : "38000",
+ "messageParameters" : {
+ "msg" : "Failed to evaluate the user-defined table function
'UDTFInvalidSelectExprStringValue' because the static 'analyze' method returned
an 'AnalyzeResult' object with the 'select' field set to a value besides a list
or tuple of 'SelectedColumn' objects. Please update the table function and then
try the query again."
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 15,
+ "stopIndex" : 57,
+ "fragment" : "UDTFInvalidSelectExprStringValue(TABLE(t2))"
+ } ]
+}
+
+
+-- !query
+SELECT * FROM UDTFInvalidComplexSelectExprMissingAlias(TABLE(t2))
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+ "errorClass" :
"UDTF_INVALID_REQUESTED_SELECTED_EXPRESSION_FROM_ANALYZE_METHOD_REQUIRES_ALIAS",
+ "sqlState" : "42802",
+ "messageParameters" : {
+ "expression" : "(input + 1)"
+ },
+ "queryContext" : [ {
+ "objectType" : "",
+ "objectName" : "",
+ "startIndex" : 15,
+ "stopIndex" : 65,
+ "fragment" : "UDTFInvalidComplexSelectExprMissingAlias(TABLE(t2))"
+ } ]
+}
+
+
-- !query
SELECT * FROM UDTFInvalidOrderByAscKeyword(TABLE(t2))
-- !query schema
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala
b/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala
index 62f3d7830ab0..848c38c95da5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala
@@ -586,10 +586,12 @@ object IntegratedUDFTestUtils extends SQLHelper {
abstract class TestPythonUDTFPartitionByOrderByBase(
partitionBy: String,
- orderBy: String) extends TestUDTF {
+ orderBy: String,
+ select: String) extends TestUDTF {
val pythonScript: String =
s"""
|from pyspark.sql.functions import AnalyzeResult, OrderingColumn,
PartitioningColumn
+ |from pyspark.sql.functions import SelectedColumn
|from pyspark.sql.types import IntegerType, Row, StructType
|class $name:
| def __init__(self):
@@ -611,6 +613,9 @@ object IntegratedUDFTestUtils extends SQLHelper {
| ],
| orderBy=[
| OrderingColumn("$orderBy")
+ | ],
+ | select=[
+ | $select
| ])
|
| def eval(self, row: Row):
@@ -627,22 +632,63 @@ object IntegratedUDFTestUtils extends SQLHelper {
object UDTFPartitionByOrderBy
extends TestPythonUDTFPartitionByOrderByBase(
partitionBy = "partition_col",
- orderBy = "input")
+ orderBy = "input",
+ select = "")
object UDTFPartitionByOrderByComplexExpr
extends TestPythonUDTFPartitionByOrderByBase(
partitionBy = "partition_col + 1",
- orderBy = "RANDOM(42)")
+ orderBy = "RANDOM(42)",
+ select = "")
+
+ object UDTFPartitionByOrderBySelectExpr
+ extends TestPythonUDTFPartitionByOrderByBase(
+ partitionBy = "partition_col",
+ orderBy = "input",
+ select = "SelectedColumn(\"partition_col\"), SelectedColumn(\"input\")")
+
+ object UDTFPartitionByOrderBySelectComplexExpr
+ extends TestPythonUDTFPartitionByOrderByBase(
+ partitionBy = "partition_col + 1",
+ orderBy = "RANDOM(42)",
+ select = "SelectedColumn(\"partition_col\"), " +
+ "SelectedColumn(name=\"input + 1\", alias=\"input\")")
+
+ object UDTFPartitionByOrderBySelectExprOnlyPartitionColumn
+ extends TestPythonUDTFPartitionByOrderByBase(
+ partitionBy = "partition_col",
+ orderBy = "input",
+ select = "SelectedColumn(\"partition_col\")")
object UDTFInvalidPartitionByOrderByParseError
extends TestPythonUDTFPartitionByOrderByBase(
partitionBy = "unparsable",
- orderBy = "input")
+ orderBy = "input",
+ select = "")
object UDTFInvalidOrderByAscKeyword
extends TestPythonUDTFPartitionByOrderByBase(
partitionBy = "partition_col",
- orderBy = "partition_col ASC")
+ orderBy = "partition_col ASC",
+ select = "")
+
+ object UDTFInvalidSelectExprParseError
+ extends TestPythonUDTFPartitionByOrderByBase(
+ partitionBy = "partition_col",
+ orderBy = "input",
+ select = "SelectedColumn(\"unparsable\")")
+
+ object UDTFInvalidSelectExprStringValue
+ extends TestPythonUDTFPartitionByOrderByBase(
+ partitionBy = "partition_col",
+ orderBy = "input",
+ select = "\"partition_cll\"")
+
+ object UDTFInvalidComplexSelectExprMissingAlias
+ extends TestPythonUDTFPartitionByOrderByBase(
+ partitionBy = "partition_col + 1",
+ orderBy = "RANDOM(42)",
+ select = "SelectedColumn(name=\"input + 1\")")
object UDTFInvalidPartitionByAndWithSinglePartition extends TestUDTF {
val pythonScript: String =
@@ -1151,12 +1197,18 @@ object IntegratedUDFTestUtils extends SQLHelper {
UDTFWithSinglePartition,
UDTFPartitionByOrderBy,
UDTFInvalidOrderByAscKeyword,
+ UDTFInvalidSelectExprParseError,
+ UDTFInvalidSelectExprStringValue,
+ UDTFInvalidComplexSelectExprMissingAlias,
UDTFInvalidPartitionByAndWithSinglePartition,
UDTFInvalidPartitionByOrderByParseError,
UDTFInvalidOrderByWithoutPartitionBy,
UDTFForwardStateFromAnalyze,
UDTFForwardStateFromAnalyzeWithKwargs,
UDTFPartitionByOrderByComplexExpr,
+ UDTFPartitionByOrderBySelectExpr,
+ UDTFPartitionByOrderBySelectComplexExpr,
+ UDTFPartitionByOrderBySelectExprOnlyPartitionColumn,
InvalidAnalyzeMethodReturnsNonStructTypeSchema,
InvalidAnalyzeMethodWithSinglePartitionNoInputTable,
InvalidAnalyzeMethodWithPartitionByNoInputTable,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]