This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 973b6746d4c [SPARK-40153][SQL] Unify resolve functions and table-valued functions 973b6746d4c is described below commit 973b6746d4ced960955fbd2ca82c76bdea239ab9 Author: allisonwang-db <allison.w...@databricks.com> AuthorDate: Fri Aug 26 10:39:30 2022 +0800 [SPARK-40153][SQL] Unify resolve functions and table-valued functions ### What changes were proposed in this pull request? This PR merges the analyzer rule `ResolveTableValuedFunctions` into `ResolveFunctions`. ### Why are the changes needed? Unify the code logic and make resolve scalar and table-valued functions consistent. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests. Closes #37586 from allisonwang-db/spark-40153-table-valued-func. Authored-by: allisonwang-db <allison.w...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/analysis/Analyzer.scala | 53 +++++++++++++++++++- .../analysis/ResolveTableValuedFunctions.scala | 58 ---------------------- .../spark/sql/catalyst/analysis/unresolved.scala | 15 +++++- .../spark/sql/catalyst/parser/AstBuilder.scala | 2 +- .../spark/sql/catalyst/trees/TreePatterns.scala | 3 +- 5 files changed, 67 insertions(+), 64 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index aa177bcbcc8..669857b6a11 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -277,7 +277,6 @@ class Analyzer(override val catalogManager: CatalogManager) Batch("Keep Legacy Outputs", Once, KeepLegacyOutputs), Batch("Resolution", fixedPoint, - ResolveTableValuedFunctions(v1SessionCatalog) :: new ResolveCatalogs(catalogManager) :: ResolveUserSpecifiedColumns :: ResolveInsertInto :: @@ -2088,12 +2087,15 @@ class Analyzer(override val catalogManager: CatalogManager) /** * Replaces [[UnresolvedFunc]]s with concrete [[LogicalPlan]]s. * Replaces [[UnresolvedFunction]]s with concrete [[Expression]]s. + * Replaces [[UnresolvedGenerator]]s with concrete [[Expression]]s. + * Replaces [[UnresolvedTableValuedFunction]]s with concrete [[LogicalPlan]]s. */ object ResolveFunctions extends Rule[LogicalPlan] { val trimWarningEnabled = new AtomicBoolean(true) def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning( - _.containsAnyPattern(UNRESOLVED_FUNC, UNRESOLVED_FUNCTION, GENERATOR), ruleId) { + _.containsAnyPattern(UNRESOLVED_FUNC, UNRESOLVED_FUNCTION, GENERATOR, + UNRESOLVED_TABLE_VALUED_FUNCTION), ruleId) { // Resolve functions with concrete relations from v2 catalog. case u @ UnresolvedFunc(nameParts, cmd, requirePersistentFunc, mismatchHint, _) => lookupBuiltinOrTempFunction(nameParts) @@ -2112,6 +2114,43 @@ class Analyzer(override val catalogManager: CatalogManager) }.getOrElse(u.copy(possibleQualifiedName = Some(fullName))) } + // Resolve table-valued function references. + case u: UnresolvedTableValuedFunction if u.functionArgs.forall(_.resolved) => + withPosition(u) { + val resolvedFunc = try { + resolveBuiltinOrTempTableFunction(u.name, u.functionArgs).getOrElse { + val CatalogAndIdentifier(catalog, ident) = expandIdentifier(u.name) + if (CatalogV2Util.isSessionCatalog(catalog)) { + v1SessionCatalog.resolvePersistentTableFunction( + ident.asFunctionIdentifier, u.functionArgs) + } else { + throw QueryCompilationErrors.missingCatalogAbilityError( + catalog, "table-valued functions") + } + } + } catch { + case _: NoSuchFunctionException => + u.failAnalysis(s"could not resolve `${u.name.quoted}` to a table-valued function") + } + // If alias names assigned, add `Project` with the aliases + if (u.outputNames.nonEmpty) { + val outputAttrs = resolvedFunc.output + // Checks if the number of the aliases is equal to expected one + if (u.outputNames.size != outputAttrs.size) { + u.failAnalysis( + s"Number of given aliases does not match number of output columns. " + + s"Function name: ${u.name.quoted}; number of aliases: " + + s"${u.outputNames.size}; number of output columns: ${outputAttrs.size}.") + } + val aliases = outputAttrs.zip(u.outputNames).map { + case (attr, name) => Alias(attr, name)() + } + Project(aliases, resolvedFunc) + } else { + resolvedFunc + } + } + case q: LogicalPlan => q.transformExpressionsWithPruning( _.containsAnyPattern(UNRESOLVED_FUNCTION, GENERATOR), ruleId) { @@ -2189,6 +2228,16 @@ class Analyzer(override val catalogManager: CatalogManager) } } + private def resolveBuiltinOrTempTableFunction( + name: Seq[String], + arguments: Seq[Expression]): Option[LogicalPlan] = { + if (name.length == 1) { + v1SessionCatalog.resolveBuiltinOrTempTableFunction(name.head, arguments) + } else { + None + } + } + private def resolveV1Function( ident: FunctionIdentifier, arguments: Seq[Expression], diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala deleted file mode 100644 index a3f7ec5d3b8..00000000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.analysis - -import org.apache.spark.sql.catalyst.catalog.SessionCatalog -import org.apache.spark.sql.catalyst.expressions.Alias -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} -import org.apache.spark.sql.catalyst.rules._ - -/** - * Rule that resolves table-valued function references. - */ -case class ResolveTableValuedFunctions(catalog: SessionCatalog) extends Rule[LogicalPlan] { - - override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case u: UnresolvedTableValuedFunction if u.functionArgs.forall(_.resolved) => - withPosition(u) { - val resolvedFunc = try { - catalog.lookupTableFunction(u.name, u.functionArgs) - } catch { - case _: NoSuchFunctionException => - u.failAnalysis(s"could not resolve `${u.name}` to a table-valued function") - } - // If alias names assigned, add `Project` with the aliases - if (u.outputNames.nonEmpty) { - val outputAttrs = resolvedFunc.output - // Checks if the number of the aliases is equal to expected one - if (u.outputNames.size != outputAttrs.size) { - u.failAnalysis( - s"Number of given aliases does not match number of output columns. " + - s"Function name: ${u.name}; number of aliases: " + - s"${u.outputNames.size}; number of output columns: ${outputAttrs.size}.") - } - val aliases = outputAttrs.zip(u.outputNames).map { - case (attr, name) => Alias(attr, name)() - } - Project(aliases, resolvedFunc) - } else { - resolvedFunc - } - } - } -} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 9d24ae4a159..25dc8494cad 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -106,7 +106,7 @@ case class UnresolvedInlineTable( * adds [[Project]] to rename the output columns. */ case class UnresolvedTableValuedFunction( - name: FunctionIdentifier, + name: Seq[String], functionArgs: Seq[Expression], outputNames: Seq[String]) extends LeafNode { @@ -114,14 +114,25 @@ case class UnresolvedTableValuedFunction( override def output: Seq[Attribute] = Nil override lazy val resolved = false + + final override val nodePatterns: Seq[TreePattern] = Seq(UNRESOLVED_TABLE_VALUED_FUNCTION) } object UnresolvedTableValuedFunction { + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + def apply( name: String, functionArgs: Seq[Expression], outputNames: Seq[String]): UnresolvedTableValuedFunction = { - UnresolvedTableValuedFunction(FunctionIdentifier(name), functionArgs, outputNames) + UnresolvedTableValuedFunction(Seq(name), functionArgs, outputNames) + } + + def apply( + name: FunctionIdentifier, + functionArgs: Seq[Expression], + outputNames: Seq[String]): UnresolvedTableValuedFunction = { + UnresolvedTableValuedFunction(name.asMultipart, functionArgs, outputNames) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 281146d3a38..76de49d86dc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -1309,7 +1309,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit } val tvf = UnresolvedTableValuedFunction( - name.asFunctionIdentifier, func.expression.asScala.map(expression).toSeq, aliases) + name, func.expression.asScala.map(expression).toSeq, aliases) tvf.optionalMap(func.tableAlias.strictIdentifier)(aliasPlan) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala index 3342f11a0fa..8fca9ec60cd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala @@ -131,8 +131,9 @@ object TreePattern extends Enumeration { val UNRESOLVED_WINDOW_EXPRESSION: Value = Value // Unresolved Plan patterns (Alphabetically ordered) - val UNRESOLVED_SUBQUERY_COLUMN_ALIAS: Value = Value val UNRESOLVED_FUNC: Value = Value + val UNRESOLVED_SUBQUERY_COLUMN_ALIAS: Value = Value + val UNRESOLVED_TABLE_VALUED_FUNCTION: Value = Value // Execution expression patterns (alphabetically ordered) val IN_SUBQUERY_EXEC: Value = Value --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org