This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 973b6746d4c [SPARK-40153][SQL] Unify resolve functions and 
table-valued functions
973b6746d4c is described below

commit 973b6746d4ced960955fbd2ca82c76bdea239ab9
Author: allisonwang-db <allison.w...@databricks.com>
AuthorDate: Fri Aug 26 10:39:30 2022 +0800

    [SPARK-40153][SQL] Unify resolve functions and table-valued functions
    
    ### What changes were proposed in this pull request?
    
    This PR merges the analyzer rule `ResolveTableValuedFunctions` into 
`ResolveFunctions`.
    
    ### Why are the changes needed?
    
    Unify the code logic and make resolve scalar and table-valued functions 
consistent.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing unit tests.
    
    Closes #37586 from allisonwang-db/spark-40153-table-valued-func.
    
    Authored-by: allisonwang-db <allison.w...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 53 +++++++++++++++++++-
 .../analysis/ResolveTableValuedFunctions.scala     | 58 ----------------------
 .../spark/sql/catalyst/analysis/unresolved.scala   | 15 +++++-
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  2 +-
 .../spark/sql/catalyst/trees/TreePatterns.scala    |  3 +-
 5 files changed, 67 insertions(+), 64 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index aa177bcbcc8..669857b6a11 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -277,7 +277,6 @@ class Analyzer(override val catalogManager: CatalogManager)
     Batch("Keep Legacy Outputs", Once,
       KeepLegacyOutputs),
     Batch("Resolution", fixedPoint,
-      ResolveTableValuedFunctions(v1SessionCatalog) ::
       new ResolveCatalogs(catalogManager) ::
       ResolveUserSpecifiedColumns ::
       ResolveInsertInto ::
@@ -2088,12 +2087,15 @@ class Analyzer(override val catalogManager: 
CatalogManager)
   /**
    * Replaces [[UnresolvedFunc]]s with concrete [[LogicalPlan]]s.
    * Replaces [[UnresolvedFunction]]s with concrete [[Expression]]s.
+   * Replaces [[UnresolvedGenerator]]s with concrete [[Expression]]s.
+   * Replaces [[UnresolvedTableValuedFunction]]s with concrete 
[[LogicalPlan]]s.
    */
   object ResolveFunctions extends Rule[LogicalPlan] {
     val trimWarningEnabled = new AtomicBoolean(true)
 
     def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUpWithPruning(
-      _.containsAnyPattern(UNRESOLVED_FUNC, UNRESOLVED_FUNCTION, GENERATOR), 
ruleId) {
+      _.containsAnyPattern(UNRESOLVED_FUNC, UNRESOLVED_FUNCTION, GENERATOR,
+        UNRESOLVED_TABLE_VALUED_FUNCTION), ruleId) {
       // Resolve functions with concrete relations from v2 catalog.
       case u @ UnresolvedFunc(nameParts, cmd, requirePersistentFunc, 
mismatchHint, _) =>
         lookupBuiltinOrTempFunction(nameParts)
@@ -2112,6 +2114,43 @@ class Analyzer(override val catalogManager: 
CatalogManager)
           }.getOrElse(u.copy(possibleQualifiedName = Some(fullName)))
         }
 
+      // Resolve table-valued function references.
+      case u: UnresolvedTableValuedFunction if 
u.functionArgs.forall(_.resolved) =>
+        withPosition(u) {
+          val resolvedFunc = try {
+            resolveBuiltinOrTempTableFunction(u.name, 
u.functionArgs).getOrElse {
+              val CatalogAndIdentifier(catalog, ident) = 
expandIdentifier(u.name)
+              if (CatalogV2Util.isSessionCatalog(catalog)) {
+                v1SessionCatalog.resolvePersistentTableFunction(
+                  ident.asFunctionIdentifier, u.functionArgs)
+              } else {
+                throw QueryCompilationErrors.missingCatalogAbilityError(
+                  catalog, "table-valued functions")
+              }
+            }
+          } catch {
+            case _: NoSuchFunctionException =>
+              u.failAnalysis(s"could not resolve `${u.name.quoted}` to a 
table-valued function")
+          }
+          // If alias names assigned, add `Project` with the aliases
+          if (u.outputNames.nonEmpty) {
+            val outputAttrs = resolvedFunc.output
+            // Checks if the number of the aliases is equal to expected one
+            if (u.outputNames.size != outputAttrs.size) {
+              u.failAnalysis(
+                s"Number of given aliases does not match number of output 
columns. " +
+                  s"Function name: ${u.name.quoted}; number of aliases: " +
+                  s"${u.outputNames.size}; number of output columns: 
${outputAttrs.size}.")
+            }
+            val aliases = outputAttrs.zip(u.outputNames).map {
+              case (attr, name) => Alias(attr, name)()
+            }
+            Project(aliases, resolvedFunc)
+          } else {
+            resolvedFunc
+          }
+        }
+
       case q: LogicalPlan =>
         q.transformExpressionsWithPruning(
           _.containsAnyPattern(UNRESOLVED_FUNCTION, GENERATOR), ruleId) {
@@ -2189,6 +2228,16 @@ class Analyzer(override val catalogManager: 
CatalogManager)
       }
     }
 
+    private def resolveBuiltinOrTempTableFunction(
+        name: Seq[String],
+        arguments: Seq[Expression]): Option[LogicalPlan] = {
+      if (name.length == 1) {
+        v1SessionCatalog.resolveBuiltinOrTempTableFunction(name.head, 
arguments)
+      } else {
+        None
+      }
+    }
+
     private def resolveV1Function(
         ident: FunctionIdentifier,
         arguments: Seq[Expression],
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
deleted file mode 100644
index a3f7ec5d3b8..00000000000
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveTableValuedFunctions.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.analysis
-
-import org.apache.spark.sql.catalyst.catalog.SessionCatalog
-import org.apache.spark.sql.catalyst.expressions.Alias
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
-import org.apache.spark.sql.catalyst.rules._
-
-/**
- * Rule that resolves table-valued function references.
- */
-case class ResolveTableValuedFunctions(catalog: SessionCatalog) extends 
Rule[LogicalPlan] {
-
-  override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    case u: UnresolvedTableValuedFunction if u.functionArgs.forall(_.resolved) 
=>
-      withPosition(u) {
-        val resolvedFunc = try {
-          catalog.lookupTableFunction(u.name, u.functionArgs)
-        } catch {
-          case _: NoSuchFunctionException =>
-            u.failAnalysis(s"could not resolve `${u.name}` to a table-valued 
function")
-        }
-        // If alias names assigned, add `Project` with the aliases
-        if (u.outputNames.nonEmpty) {
-          val outputAttrs = resolvedFunc.output
-          // Checks if the number of the aliases is equal to expected one
-          if (u.outputNames.size != outputAttrs.size) {
-            u.failAnalysis(
-              s"Number of given aliases does not match number of output 
columns. " +
-                s"Function name: ${u.name}; number of aliases: " +
-                s"${u.outputNames.size}; number of output columns: 
${outputAttrs.size}.")
-          }
-          val aliases = outputAttrs.zip(u.outputNames).map {
-            case (attr, name) => Alias(attr, name)()
-          }
-          Project(aliases, resolvedFunc)
-        } else {
-          resolvedFunc
-        }
-      }
-  }
-}
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index 9d24ae4a159..25dc8494cad 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -106,7 +106,7 @@ case class UnresolvedInlineTable(
  *                    adds [[Project]] to rename the output columns.
  */
 case class UnresolvedTableValuedFunction(
-    name: FunctionIdentifier,
+    name: Seq[String],
     functionArgs: Seq[Expression],
     outputNames: Seq[String])
   extends LeafNode {
@@ -114,14 +114,25 @@ case class UnresolvedTableValuedFunction(
   override def output: Seq[Attribute] = Nil
 
   override lazy val resolved = false
+
+  final override val nodePatterns: Seq[TreePattern] = 
Seq(UNRESOLVED_TABLE_VALUED_FUNCTION)
 }
 
 object UnresolvedTableValuedFunction {
+  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
   def apply(
       name: String,
       functionArgs: Seq[Expression],
       outputNames: Seq[String]): UnresolvedTableValuedFunction = {
-    UnresolvedTableValuedFunction(FunctionIdentifier(name), functionArgs, 
outputNames)
+    UnresolvedTableValuedFunction(Seq(name), functionArgs, outputNames)
+  }
+
+  def apply(
+      name: FunctionIdentifier,
+      functionArgs: Seq[Expression],
+      outputNames: Seq[String]): UnresolvedTableValuedFunction = {
+    UnresolvedTableValuedFunction(name.asMultipart, functionArgs, outputNames)
   }
 }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 281146d3a38..76de49d86dc 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -1309,7 +1309,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] 
with SQLConfHelper wit
     }
 
     val tvf = UnresolvedTableValuedFunction(
-      name.asFunctionIdentifier, 
func.expression.asScala.map(expression).toSeq, aliases)
+      name, func.expression.asScala.map(expression).toSeq, aliases)
     tvf.optionalMap(func.tableAlias.strictIdentifier)(aliasPlan)
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
index 3342f11a0fa..8fca9ec60cd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
@@ -131,8 +131,9 @@ object TreePattern extends Enumeration  {
   val UNRESOLVED_WINDOW_EXPRESSION: Value = Value
 
   // Unresolved Plan patterns (Alphabetically ordered)
-  val UNRESOLVED_SUBQUERY_COLUMN_ALIAS: Value = Value
   val UNRESOLVED_FUNC: Value = Value
+  val UNRESOLVED_SUBQUERY_COLUMN_ALIAS: Value = Value
+  val UNRESOLVED_TABLE_VALUED_FUNCTION: Value = Value
 
   // Execution expression patterns (alphabetically ordered)
   val IN_SUBQUERY_EXEC: Value = Value


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to