This is an automated email from the ASF dual-hosted git repository.

ueshin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 31f85e5ff77c [SPARK-46966][PYTHON] Add UDTF API for 'analyze' method 
to indicate subset of input table columns to select
31f85e5ff77c is described below

commit 31f85e5ff77c9f9b704160c3e70849a488a4f40d
Author: Daniel Tenedorio <daniel.tenedo...@databricks.com>
AuthorDate: Wed Feb 7 13:42:44 2024 -0800

    [SPARK-46966][PYTHON] Add UDTF API for 'analyze' method to indicate subset 
of input table columns to select
    
    ### What changes were proposed in this pull request?
    
    This PR adds a UDTF API for the 'analyze' method to indicate subset of 
input table columns to select.
    
    For example, this UDTF populates this 'select' list to indicate that Spark 
should only return two input columns from the input table: 'input' and 
'partition_col':
    
    ```
    from pyspark.sql.functions import AnalyzeResult, OrderingColumn, 
PartitioningColumn, SelectedColumn
    from pyspark.sql.types import IntegerType, Row, StructType
    class Udtf:
        def __init__(self):
            self._partition_col = None
            self._count = 0
            self._sum = 0
            self._last = None
    
        staticmethod
        def analyze(row: Row):
            return AnalyzeResult(
                schema=StructType()
                    .add("user_id", IntegerType())
                    .add("count", IntegerType())
                    .add("total", IntegerType())
                    .add("last", IntegerType()),
                partitionBy=[
                    PartitioningColumn("user_id")
                ],
                orderBy=[
                    OrderingColumn("timestamp")
                ],
                select=[
                    SelectedColumn("input"),
                    SelectedColumn("partition_col")
                ])
    
        def eval(self, row: Row):
            self._partition_col = row["partition_col"]
            self._count += 1
            self._last = row["input"]
            self._sum += row["input"]
    
        def terminate(self):
            yield self._partition_col, self._count, self._sum, self._last
    ```
    
    ### Why are the changes needed?
    
    This can reduce the amount of data sent between the JVM and Python 
interpreter, improving performance.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    This PR adds test coverage.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #45007 from dtenedor/udtf-select-cols.
    
    Authored-by: Daniel Tenedorio <daniel.tenedo...@databricks.com>
    Signed-off-by: Takuya UESHIN <ues...@databricks.com>
---
 .../src/main/resources/error/error-classes.json    |  6 ++
 docs/sql-error-conditions.md                       |  6 ++
 python/pyspark/sql/functions/builtin.py            |  2 +-
 python/pyspark/sql/udtf.py                         | 32 ++++++-
 python/pyspark/sql/worker/analyze_udtf.py          | 20 ++++-
 .../FunctionTableSubqueryArgumentExpression.scala  | 21 ++++-
 .../spark/sql/catalyst/expressions/PythonUDF.scala | 27 +++++-
 .../spark/sql/errors/QueryCompilationErrors.scala  |  6 ++
 .../python/UserDefinedPythonFunction.scala         | 15 +++-
 .../sql-tests/analyzer-results/udtf/udtf.sql.out   | 79 +++++++++++++++++
 .../test/resources/sql-tests/inputs/udtf/udtf.sql  |  6 ++
 .../resources/sql-tests/results/udtf/udtf.sql.out  | 98 ++++++++++++++++++++++
 .../apache/spark/sql/IntegratedUDFTestUtils.scala  | 62 ++++++++++++--
 13 files changed, 367 insertions(+), 13 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-classes.json 
b/common/utils/src/main/resources/error/error-classes.json
index e9dc715cf328..4fcf9248d3e2 100644
--- a/common/utils/src/main/resources/error/error-classes.json
+++ b/common/utils/src/main/resources/error/error-classes.json
@@ -3456,6 +3456,12 @@
     ],
     "sqlState" : "42802"
   },
+  
"UDTF_INVALID_REQUESTED_SELECTED_EXPRESSION_FROM_ANALYZE_METHOD_REQUIRES_ALIAS" 
: {
+    "message" : [
+      "Failed to evaluate the user-defined table function because its 
'analyze' method returned a requested 'select' expression (<expression>) that 
does not include a corresponding alias; please update the UDTF to specify an 
alias there and then try the query again."
+    ],
+    "sqlState" : "42802"
+  },
   "UNABLE_TO_ACQUIRE_MEMORY" : {
     "message" : [
       "Unable to acquire <requestedBytes> bytes of memory, got 
<receivedBytes>."
diff --git a/docs/sql-error-conditions.md b/docs/sql-error-conditions.md
index 7c51a6bcc145..f03a98e03d24 100644
--- a/docs/sql-error-conditions.md
+++ b/docs/sql-error-conditions.md
@@ -2215,6 +2215,12 @@ Please ensure that the number of aliases provided 
matches the number of columns
 
 Failed to evaluate the user-defined table function because its 'analyze' 
method returned a requested OrderingColumn whose column name expression 
included an unnecessary alias `<aliasName>`; please remove this alias and then 
try the query again.
 
+### 
UDTF_INVALID_REQUESTED_SELECTED_EXPRESSION_FROM_ANALYZE_METHOD_REQUIRES_ALIAS
+
+[SQLSTATE: 
42802](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
+
+Failed to evaluate the user-defined table function because its 'analyze' 
method returned a requested 'select' expression (`<expression>`) that does not 
include a corresponding alias; please update the UDTF to specify an alias there 
and then try the query again.
+
 ### UNABLE_TO_ACQUIRE_MEMORY
 
 [SQLSTATE: 
53200](sql-error-conditions-sqlstates.html#class-53-insufficient-resources)
diff --git a/python/pyspark/sql/functions/builtin.py 
b/python/pyspark/sql/functions/builtin.py
index cb872fdb8180..110006df4317 100644
--- a/python/pyspark/sql/functions/builtin.py
+++ b/python/pyspark/sql/functions/builtin.py
@@ -50,7 +50,7 @@ from pyspark.sql.types import ArrayType, DataType, 
StringType, StructType, _from
 # Keep UserDefinedFunction import for backwards compatible import; moved in 
SPARK-22409
 from pyspark.sql.udf import UserDefinedFunction, _create_py_udf  # noqa: F401
 from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult  # noqa: F401
-from pyspark.sql.udtf import OrderingColumn, PartitioningColumn  # noqa: F401
+from pyspark.sql.udtf import OrderingColumn, PartitioningColumn, 
SelectedColumn  # noqa: F401
 from pyspark.sql.udtf import SkipRestOfInputTableException  # noqa: F401
 from pyspark.sql.udtf import UserDefinedTableFunction, _create_py_udtf
 
diff --git a/python/pyspark/sql/udtf.py b/python/pyspark/sql/udtf.py
index fe3b0dc54314..c4ded5af71e4 100644
--- a/python/pyspark/sql/udtf.py
+++ b/python/pyspark/sql/udtf.py
@@ -43,6 +43,7 @@ __all__ = [
     "AnalyzeResult",
     "PartitioningColumn",
     "OrderingColumn",
+    "SelectedColumn",
     "SkipRestOfInputTableException",
     "UDTFRegistration",
 ]
@@ -108,6 +109,26 @@ class OrderingColumn:
     overrideNullsFirst: Optional[bool] = None
 
 
+@dataclass(frozen=True)
+class SelectedColumn:
+    """
+    Represents an expression that the UDTF is specifying for Catalyst to 
evaluate against the
+    columns in the input TABLE argument. The UDTF then receives one input 
column for each expression
+    in the list, in the order they are listed.
+
+    Parameters
+    ----------
+    name : str
+        The contents of the selected column name or expression represented as 
a SQL string.
+    alias : str, default ''
+        If non-empty, this is the alias for the column or expression as 
visible from the UDTF's
+        'eval' method. This is required if the expression is not a simple 
column reference.
+    """
+
+    name: str
+    alias: str = ""
+
+
 # Note: this class is a "dataclass" for purposes of convenience, but it is not 
marked "frozen"
 # because the intention is that users may create subclasses of it for purposes 
of returning custom
 # information from the "analyze" method.
@@ -118,13 +139,13 @@ class AnalyzeResult:
 
     Parameters
     ----------
-    schema : :class:`StructType`
+    schema: :class:`StructType`
         The schema that the Python UDTF will return.
-    withSinglePartition : bool
+    withSinglePartition: bool
         If true, the UDTF is specifying for Catalyst to repartition all rows 
of the input TABLE
         argument to one collection for consumption by exactly one instance of 
the correpsonding
         UDTF class.
-    partitionBy : sequence of :class:`PartitioningColumn`
+    partitionBy: sequence of :class:`PartitioningColumn`
         If non-empty, this is a sequence of expressions that the UDTF is 
specifying for Catalyst to
         partition the input TABLE argument by. In this case, calls to the UDTF 
may not include any
         explicit PARTITION BY clause, in which case Catalyst will return an 
error. This option is
@@ -133,12 +154,17 @@ class AnalyzeResult:
         If non-empty, this is a sequence of expressions that the UDTF is 
specifying for Catalyst to
         sort the input TABLE argument by. Note that the 'partitionBy' list 
must also be non-empty
         in this case.
+    select: sequence of :class:`SelectedColumn`
+        If non-empty, this is a sequence of expressions that the UDTF is 
specifying for Catalyst to
+        evaluate against the columns in the input TABLE argument. The UDTF 
then receives one input
+        attribute for each name in the list, in the order they are listed.
     """
 
     schema: StructType
     withSinglePartition: bool = False
     partitionBy: Sequence[PartitioningColumn] = field(default_factory=tuple)
     orderBy: Sequence[OrderingColumn] = field(default_factory=tuple)
+    select: Sequence[SelectedColumn] = field(default_factory=tuple)
 
 
 class SkipRestOfInputTableException(Exception):
diff --git a/python/pyspark/sql/worker/analyze_udtf.py 
b/python/pyspark/sql/worker/analyze_udtf.py
index ce21e4859770..f61330b806cd 100644
--- a/python/pyspark/sql/worker/analyze_udtf.py
+++ b/python/pyspark/sql/worker/analyze_udtf.py
@@ -31,7 +31,7 @@ from pyspark.serializers import (
     write_with_length,
     SpecialLengths,
 )
-from pyspark.sql.functions import PartitioningColumn
+from pyspark.sql.functions import PartitioningColumn, SelectedColumn
 from pyspark.sql.types import _parse_datatype_json_string, StructType
 from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult
 from pyspark.util import handle_worker_exception
@@ -203,6 +203,19 @@ def main(infile: IO, outfile: IO) -> None:
                     and then try the query again."""
                 )
             )
+        elif isinstance(result.select, (list, tuple)) and (
+            len(result.select) > 0
+            and not all([isinstance(val, SelectedColumn) for val in 
result.select])
+        ):
+            raise PySparkValueError(
+                format_error(
+                    f"""
+                    {error_prefix} because the static 'analyze' method 
returned an
+                    'AnalyzeResult' object with the 'select' field set to a 
value besides a
+                    list or tuple of 'SelectedColumn' objects. Please update 
the table function
+                    and then try the query again."""
+                )
+            )
 
         # Return the analyzed schema.
         write_with_length(result.schema.json().encode("utf-8"), outfile)
@@ -225,6 +238,11 @@ def main(infile: IO, outfile: IO) -> None:
                 write_int(1, outfile)
             else:
                 write_int(2, outfile)
+        # Return the requested selected input table columns, if specified.
+        write_int(len(result.select), outfile)
+        for col in result.select:
+            write_with_length(col.name.encode("utf-8"), outfile)
+            write_with_length(col.alias.encode("utf-8"), outfile)
 
     except BaseException as e:
         handle_worker_exception(e, outfile)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala
index da2dd9b1e256..3c60e1dfb2c3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
 
 import org.apache.spark.sql.catalyst.plans.logical.{HintInfo, LogicalPlan, 
Project, Repartition, RepartitionByExpression, Sort}
 import 
org.apache.spark.sql.catalyst.trees.TreePattern.{FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION,
 TreePattern}
+import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.types.DataType
 
 /**
@@ -58,6 +59,10 @@ import org.apache.spark.sql.types.DataType
  * @param orderByExpressions if non-empty, the TABLE argument included the 
ORDER BY clause to
  *                           indicate that the rows within each partition of 
the table function are
  *                           to arrive in the provided order.
+ * @param selectedInputExpressions If non-empty, this is a sequence of 
expressions that the UDTF is
+ *                                 specifying for Catalyst to evaluate against 
the columns in the
+ *                                 input TABLE argument. The UDTF then 
receives one input attribute
+ *                                 for each name in the list, in the order 
they are listed.
  */
 case class FunctionTableSubqueryArgumentExpression(
     plan: LogicalPlan,
@@ -65,7 +70,8 @@ case class FunctionTableSubqueryArgumentExpression(
     exprId: ExprId = NamedExpression.newExprId,
     partitionByExpressions: Seq[Expression] = Seq.empty,
     withSinglePartition: Boolean = false,
-    orderByExpressions: Seq[SortOrder] = Seq.empty)
+    orderByExpressions: Seq[SortOrder] = Seq.empty,
+    selectedInputExpressions: Seq[PythonUDTFSelectedExpression] = Seq.empty)
   extends SubqueryExpression(plan, outerAttrs, exprId, Seq.empty, None) with 
Unevaluable {
 
   assert(!(withSinglePartition && partitionByExpressions.nonEmpty),
@@ -134,6 +140,19 @@ case class FunctionTableSubqueryArgumentExpression(
           child = subquery)
       }
     }
+    // If instructed, add a projection to compute the specified input 
expressions.
+    if (selectedInputExpressions.nonEmpty) {
+      val projectList: Seq[NamedExpression] = selectedInputExpressions.map {
+        case PythonUDTFSelectedExpression(expression: Expression, Some(alias: 
String)) =>
+          Alias(expression, alias)()
+        case PythonUDTFSelectedExpression(a: Attribute, None) =>
+          a
+        case PythonUDTFSelectedExpression(other: Expression, None) =>
+          throw QueryCompilationErrors
+            .invalidUDTFSelectExpressionFromAnalyzeMethodNeedsAlias(other.sql)
+      } ++ extraProjectedPartitioningExpressions
+      subquery = Project(projectList, subquery)
+    }
     Project(Seq(Alias(CreateStruct(subquery.output), "c")()), subquery)
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
index f886b50e8a23..b38d161be383 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala
@@ -246,6 +246,12 @@ case class UnresolvedPolymorphicPythonUDTF(
  * @param orderByExpressions if non-empty, this contains the list of ordering 
items that the
  *                           'analyze' method explicitly indicated that the 
UDTF call should consume
  *                           the input table rows by
+ * @param selectedInputExpressions If non-empty, this is a list of expressions 
that the UDTF is
+ *                                 specifying for Catalyst to evaluate against 
the columns in the
+ *                                 input TABLE argument. In this case, 
Catalyst will insert a
+ *                                 projection to evaluate these expressions 
and return the result to
+ *                                 the UDTF. The UDTF then receives one input 
column for each
+ *                                 expression in the list, in the order they 
are listed.
  * @param pickledAnalyzeResult this is the pickled 'AnalyzeResult' instance 
from the UDTF, which
  *                             contains all metadata returned by the Python 
UDTF 'analyze' method
  *                             including the result schema of the function 
call as well as optional
@@ -256,6 +262,7 @@ case class PythonUDTFAnalyzeResult(
     withSinglePartition: Boolean,
     partitionByExpressions: Seq[Expression],
     orderByExpressions: Seq[SortOrder],
+    selectedInputExpressions: Seq[PythonUDTFSelectedExpression],
     pickledAnalyzeResult: Array[Byte]) {
   /**
    * Applies the requested properties from this analysis result to the target 
TABLE argument
@@ -291,6 +298,7 @@ case class PythonUDTFAnalyzeResult(
     var newWithSinglePartition = t.withSinglePartition
     var newPartitionByExpressions = t.partitionByExpressions
     var newOrderByExpressions = t.orderByExpressions
+    var newSelectedInputExpressions = t.selectedInputExpressions
     if (withSinglePartition) {
       newWithSinglePartition = true
     }
@@ -300,13 +308,30 @@ case class PythonUDTFAnalyzeResult(
     if (orderByExpressions.nonEmpty) {
       newOrderByExpressions = orderByExpressions
     }
+    if (selectedInputExpressions.nonEmpty) {
+      newSelectedInputExpressions = selectedInputExpressions
+    }
     t.copy(
       withSinglePartition = newWithSinglePartition,
       partitionByExpressions = newPartitionByExpressions,
-      orderByExpressions = newOrderByExpressions)
+      orderByExpressions = newOrderByExpressions,
+      selectedInputExpressions = newSelectedInputExpressions)
   }
 }
 
+/**
+ * Represents an expression that the UDTF is specifying for Catalyst to 
evaluate against the
+ * columns in the input TABLE argument. The UDTF then receives one input 
column for each expression
+ * in the list, in the order they are listed.
+ *
+ * @param expression the expression that the UDTF is specifying for Catalyst 
to evaluate against the
+ *                   columns in the input TABLE argument
+ * @param alias If present, this is the alias for the column or expression as 
visible from the
+ *              UDTF's 'eval' method. This is required if the expression is 
not a simple column
+ *              reference.
+ */
+case class PythonUDTFSelectedExpression(expression: Expression, alias: 
Option[String])
+
 /**
  * A place holder used when printing expressions without debugging information 
such as the
  * result id.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index f0cbe727edfe..46028817e8eb 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -689,6 +689,12 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
       messageParameters = Map("aliasName" -> aliasName))
   }
 
+  def invalidUDTFSelectExpressionFromAnalyzeMethodNeedsAlias(expression: 
String): Throwable = {
+    new AnalysisException(
+      errorClass = 
"UDTF_INVALID_REQUESTED_SELECTED_EXPRESSION_FROM_ANALYZE_METHOD_REQUIRES_ALIAS",
+      messageParameters = Map("expression" -> expression))
+  }
+
   def windowAggregateFunctionWithFilterNotSupportedError(): Throwable = {
     new AnalysisException(
       errorClass = "_LEGACY_ERROR_TEMP_1030",
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala
index 6bb1992a064a..32406460ad71 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala
@@ -25,7 +25,7 @@ import net.razorvine.pickle.Pickler
 
 import org.apache.spark.api.python.{PythonEvalType, PythonFunction, 
PythonWorkerUtils, SpecialLengths}
 import org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, 
Descending, Expression, FunctionTableSubqueryArgumentExpression, 
NamedArgumentExpression, NullsFirst, NullsLast, PythonUDAF, PythonUDF, 
PythonUDTF, PythonUDTFAnalyzeResult, SortOrder, UnresolvedPolymorphicPythonUDTF}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, 
Descending, Expression, FunctionTableSubqueryArgumentExpression, 
NamedArgumentExpression, NullsFirst, NullsLast, PythonUDAF, PythonUDF, 
PythonUDTF, PythonUDTFAnalyzeResult, PythonUDTFSelectedExpression, SortOrder, 
UnresolvedPolymorphicPythonUDTF}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.{Generate, LogicalPlan, 
NamedParametersSupport, OneRowRelation}
 import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -276,11 +276,24 @@ class UserDefinedPythonTableFunctionAnalyzeRunner(
         case 2 => orderBy.append(SortOrder(parsed, direction, NullsLast, 
Seq.empty))
       }
     }
+    // Receive the list of requested input columns to select, if specified.
+    val numSelectedInputExpressions = dataIn.readInt()
+    val selectedInputExpressions = 
ArrayBuffer.empty[PythonUDTFSelectedExpression]
+    for (_ <- 0 until numSelectedInputExpressions) {
+      val expressionSql: String = PythonWorkerUtils.readUTF(dataIn)
+      val parsed: Expression = parser.parseExpression(expressionSql)
+      val alias: String = PythonWorkerUtils.readUTF(dataIn)
+      selectedInputExpressions.append(
+        PythonUDTFSelectedExpression(
+          parsed,
+          if (alias.nonEmpty) Some(alias) else None))
+    }
     PythonUDTFAnalyzeResult(
       schema = schema,
       withSinglePartition = withSinglePartition,
       partitionByExpressions = partitionByExpressions.toSeq,
       orderByExpressions = orderBy.toSeq,
+      selectedInputExpressions = selectedInputExpressions.toSeq,
       pickledAnalyzeResult = pickledAnalyzeResult)
   }
 }
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/udtf/udtf.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/udtf/udtf.sql.out
index 2eeb25633ced..8cf567c14b3b 100644
--- a/sql/core/src/test/resources/sql-tests/analyzer-results/udtf/udtf.sql.out
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/udtf/udtf.sql.out
@@ -284,6 +284,85 @@ SELECT * FROM UDTFPartitionByOrderByComplexExpr(TABLE(t2))
 [Analyzer test output redacted due to nondeterminism]
 
 
+-- !query
+SELECT * FROM UDTFPartitionByOrderBySelectExpr(TABLE(t2))
+-- !query analysis
+[Analyzer test output redacted due to nondeterminism]
+
+
+-- !query
+SELECT * FROM UDTFPartitionByOrderBySelectComplexExpr(TABLE(t2))
+-- !query analysis
+[Analyzer test output redacted due to nondeterminism]
+
+
+-- !query
+SELECT * FROM UDTFPartitionByOrderBySelectExprOnlyPartitionColumn(TABLE(t2))
+-- !query analysis
+[Analyzer test output redacted due to nondeterminism]
+
+
+-- !query
+SELECT * FROM UDTFInvalidSelectExprParseError(TABLE(t2))
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+  "sqlState" : "42703",
+  "messageParameters" : {
+    "objectName" : "`unparsable`",
+    "proposal" : "`t2`.`input`, `partition_by_0`, `t2`.`partition_col`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 10,
+    "fragment" : "unparsable"
+  } ]
+}
+
+
+-- !query
+SELECT * FROM UDTFInvalidSelectExprStringValue(TABLE(t2))
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "TABLE_VALUED_FUNCTION_FAILED_TO_ANALYZE_IN_PYTHON",
+  "sqlState" : "38000",
+  "messageParameters" : {
+    "msg" : "Failed to evaluate the user-defined table function 
'UDTFInvalidSelectExprStringValue' because the static 'analyze' method returned 
an 'AnalyzeResult' object with the 'select' field set to a value besides a list 
or tuple of 'SelectedColumn' objects. Please update the table function and then 
try the query again."
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 15,
+    "stopIndex" : 57,
+    "fragment" : "UDTFInvalidSelectExprStringValue(TABLE(t2))"
+  } ]
+}
+
+
+-- !query
+SELECT * FROM UDTFInvalidComplexSelectExprMissingAlias(TABLE(t2))
+-- !query analysis
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : 
"UDTF_INVALID_REQUESTED_SELECTED_EXPRESSION_FROM_ANALYZE_METHOD_REQUIRES_ALIAS",
+  "sqlState" : "42802",
+  "messageParameters" : {
+    "expression" : "(input + 1)"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 15,
+    "stopIndex" : 65,
+    "fragment" : "UDTFInvalidComplexSelectExprMissingAlias(TABLE(t2))"
+  } ]
+}
+
+
 -- !query
 SELECT * FROM UDTFInvalidOrderByAscKeyword(TABLE(t2))
 -- !query analysis
diff --git a/sql/core/src/test/resources/sql-tests/inputs/udtf/udtf.sql 
b/sql/core/src/test/resources/sql-tests/inputs/udtf/udtf.sql
index 304ee1168e48..03d5b001d102 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/udtf/udtf.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/udtf/udtf.sql
@@ -76,6 +76,12 @@ SELECT * FROM
     JOIN LATERAL
     UDTFPartitionByOrderBy(TABLE(t2) PARTITION BY partition_col);
 SELECT * FROM UDTFPartitionByOrderByComplexExpr(TABLE(t2));
+SELECT * FROM UDTFPartitionByOrderBySelectExpr(TABLE(t2));
+SELECT * FROM UDTFPartitionByOrderBySelectComplexExpr(TABLE(t2));
+SELECT * FROM UDTFPartitionByOrderBySelectExprOnlyPartitionColumn(TABLE(t2));
+SELECT * FROM UDTFInvalidSelectExprParseError(TABLE(t2));
+SELECT * FROM UDTFInvalidSelectExprStringValue(TABLE(t2));
+SELECT * FROM UDTFInvalidComplexSelectExprMissingAlias(TABLE(t2));
 SELECT * FROM UDTFInvalidOrderByAscKeyword(TABLE(t2));
 -- As a reminder, UDTFInvalidPartitionByAndWithSinglePartition returns this 
analyze result:
 --     AnalyzeResult(
diff --git a/sql/core/src/test/resources/sql-tests/results/udtf/udtf.sql.out 
b/sql/core/src/test/resources/sql-tests/results/udtf/udtf.sql.out
index 0ff226df2c86..c85eacdc348b 100644
--- a/sql/core/src/test/resources/sql-tests/results/udtf/udtf.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udtf/udtf.sql.out
@@ -344,6 +344,104 @@ struct<partition_col:int,count:int,total:int,last:int>
 1      2       5       3
 
 
+-- !query
+SELECT * FROM UDTFPartitionByOrderBySelectExpr(TABLE(t2))
+-- !query schema
+struct<partition_col:int,count:int,total:int,last:int>
+-- !query output
+0      1       1       1
+1      2       5       3
+
+
+-- !query
+SELECT * FROM UDTFPartitionByOrderBySelectComplexExpr(TABLE(t2))
+-- !query schema
+struct<partition_col:int,count:int,total:int,last:int>
+-- !query output
+0      1       2       2
+1      2       7       4
+
+
+-- !query
+SELECT * FROM UDTFPartitionByOrderBySelectExprOnlyPartitionColumn(TABLE(t2))
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.api.python.PythonException
+ValueError: 'input' is not in list
+
+During handling of the above exception, another exception occurred:
+
+pyspark.errors.exceptions.base.PySparkValueError: input
+
+
+-- !query
+SELECT * FROM UDTFInvalidSelectExprParseError(TABLE(t2))
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+  "sqlState" : "42703",
+  "messageParameters" : {
+    "objectName" : "`unparsable`",
+    "proposal" : "`t2`.`input`, `partition_by_0`, `t2`.`partition_col`"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 1,
+    "stopIndex" : 10,
+    "fragment" : "unparsable"
+  } ]
+}
+
+
+-- !query
+SELECT * FROM UDTFInvalidSelectExprStringValue(TABLE(t2))
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "TABLE_VALUED_FUNCTION_FAILED_TO_ANALYZE_IN_PYTHON",
+  "sqlState" : "38000",
+  "messageParameters" : {
+    "msg" : "Failed to evaluate the user-defined table function 
'UDTFInvalidSelectExprStringValue' because the static 'analyze' method returned 
an 'AnalyzeResult' object with the 'select' field set to a value besides a list 
or tuple of 'SelectedColumn' objects. Please update the table function and then 
try the query again."
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 15,
+    "stopIndex" : 57,
+    "fragment" : "UDTFInvalidSelectExprStringValue(TABLE(t2))"
+  } ]
+}
+
+
+-- !query
+SELECT * FROM UDTFInvalidComplexSelectExprMissingAlias(TABLE(t2))
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : 
"UDTF_INVALID_REQUESTED_SELECTED_EXPRESSION_FROM_ANALYZE_METHOD_REQUIRES_ALIAS",
+  "sqlState" : "42802",
+  "messageParameters" : {
+    "expression" : "(input + 1)"
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 15,
+    "stopIndex" : 65,
+    "fragment" : "UDTFInvalidComplexSelectExprMissingAlias(TABLE(t2))"
+  } ]
+}
+
+
 -- !query
 SELECT * FROM UDTFInvalidOrderByAscKeyword(TABLE(t2))
 -- !query schema
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala
index 62f3d7830ab0..848c38c95da5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala
@@ -586,10 +586,12 @@ object IntegratedUDFTestUtils extends SQLHelper {
 
   abstract class TestPythonUDTFPartitionByOrderByBase(
       partitionBy: String,
-      orderBy: String) extends TestUDTF {
+      orderBy: String,
+      select: String) extends TestUDTF {
     val pythonScript: String =
       s"""
         |from pyspark.sql.functions import AnalyzeResult, OrderingColumn, 
PartitioningColumn
+        |from pyspark.sql.functions import SelectedColumn
         |from pyspark.sql.types import IntegerType, Row, StructType
         |class $name:
         |    def __init__(self):
@@ -611,6 +613,9 @@ object IntegratedUDFTestUtils extends SQLHelper {
         |            ],
         |            orderBy=[
         |                OrderingColumn("$orderBy")
+        |            ],
+        |            select=[
+        |                $select
         |            ])
         |
         |    def eval(self, row: Row):
@@ -627,22 +632,63 @@ object IntegratedUDFTestUtils extends SQLHelper {
   object UDTFPartitionByOrderBy
     extends TestPythonUDTFPartitionByOrderByBase(
       partitionBy = "partition_col",
-      orderBy = "input")
+      orderBy = "input",
+      select = "")
 
   object UDTFPartitionByOrderByComplexExpr
     extends TestPythonUDTFPartitionByOrderByBase(
       partitionBy = "partition_col + 1",
-      orderBy = "RANDOM(42)")
+      orderBy = "RANDOM(42)",
+      select = "")
+
+  object UDTFPartitionByOrderBySelectExpr
+    extends TestPythonUDTFPartitionByOrderByBase(
+      partitionBy = "partition_col",
+      orderBy = "input",
+      select = "SelectedColumn(\"partition_col\"), SelectedColumn(\"input\")")
+
+  object UDTFPartitionByOrderBySelectComplexExpr
+    extends TestPythonUDTFPartitionByOrderByBase(
+      partitionBy = "partition_col + 1",
+      orderBy = "RANDOM(42)",
+      select = "SelectedColumn(\"partition_col\"), " +
+        "SelectedColumn(name=\"input + 1\", alias=\"input\")")
+
+  object UDTFPartitionByOrderBySelectExprOnlyPartitionColumn
+    extends TestPythonUDTFPartitionByOrderByBase(
+      partitionBy = "partition_col",
+      orderBy = "input",
+      select = "SelectedColumn(\"partition_col\")")
 
   object UDTFInvalidPartitionByOrderByParseError
     extends TestPythonUDTFPartitionByOrderByBase(
       partitionBy = "unparsable",
-      orderBy = "input")
+      orderBy = "input",
+      select = "")
 
   object UDTFInvalidOrderByAscKeyword
     extends TestPythonUDTFPartitionByOrderByBase(
       partitionBy = "partition_col",
-      orderBy = "partition_col ASC")
+      orderBy = "partition_col ASC",
+      select = "")
+
+  object UDTFInvalidSelectExprParseError
+    extends TestPythonUDTFPartitionByOrderByBase(
+      partitionBy = "partition_col",
+      orderBy = "input",
+      select = "SelectedColumn(\"unparsable\")")
+
+  object UDTFInvalidSelectExprStringValue
+    extends TestPythonUDTFPartitionByOrderByBase(
+      partitionBy = "partition_col",
+      orderBy = "input",
+      select = "\"partition_cll\"")
+
+  object UDTFInvalidComplexSelectExprMissingAlias
+    extends TestPythonUDTFPartitionByOrderByBase(
+      partitionBy = "partition_col + 1",
+      orderBy = "RANDOM(42)",
+      select = "SelectedColumn(name=\"input + 1\")")
 
   object UDTFInvalidPartitionByAndWithSinglePartition extends TestUDTF {
     val pythonScript: String =
@@ -1151,12 +1197,18 @@ object IntegratedUDFTestUtils extends SQLHelper {
     UDTFWithSinglePartition,
     UDTFPartitionByOrderBy,
     UDTFInvalidOrderByAscKeyword,
+    UDTFInvalidSelectExprParseError,
+    UDTFInvalidSelectExprStringValue,
+    UDTFInvalidComplexSelectExprMissingAlias,
     UDTFInvalidPartitionByAndWithSinglePartition,
     UDTFInvalidPartitionByOrderByParseError,
     UDTFInvalidOrderByWithoutPartitionBy,
     UDTFForwardStateFromAnalyze,
     UDTFForwardStateFromAnalyzeWithKwargs,
     UDTFPartitionByOrderByComplexExpr,
+    UDTFPartitionByOrderBySelectExpr,
+    UDTFPartitionByOrderBySelectComplexExpr,
+    UDTFPartitionByOrderBySelectExprOnlyPartitionColumn,
     InvalidAnalyzeMethodReturnsNonStructTypeSchema,
     InvalidAnalyzeMethodWithSinglePartitionNoInputTable,
     InvalidAnalyzeMethodWithPartitionByNoInputTable,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to