This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new fdedd73 [SPARK-34421][SQL] Resolve temporary functions and views in
views with CTEs
fdedd73 is described below
commit fdedd7371d77e5211c25e9b52c57acd3d4a30e01
Author: Peter Toth <[email protected]>
AuthorDate: Fri Feb 19 18:14:49 2021 +0800
[SPARK-34421][SQL] Resolve temporary functions and views in views with CTEs
### What changes were proposed in this pull request?
This PR:
- Fixes a bug that prevents analysis of:
```
CREATE TEMPORARY VIEW temp_view AS WITH cte AS (SELECT temp_func(0))
SELECT * FROM cte;
SELECT * FROM temp_view
```
by throwing:
```
Undefined function: 'temp_func'. This function is neither a registered
temporary function nor a permanent function registered in the database
'default'.
```
- and doesn't report analysis error when it should:
```
CREATE TEMPORARY VIEW temp_view AS SELECT 0;
CREATE VIEW view_on_temp_view AS WITH cte AS (SELECT * FROM temp_view)
SELECT * FROM cte
```
by properly collecting temporary objects from VIEW definitions with CTEs.
- Minor refactor to make the affected code more readable.
### Why are the changes needed?
To fix a bug introduced with https://github.com/apache/spark/pull/30567
### Does this PR introduce _any_ user-facing change?
Yes, the query works again.
### How was this patch tested?
Added new UT + existing ones.
Closes #31550 from peter-toth/SPARK-34421-temp-functions-in-views-with-cte.
Authored-by: Peter Toth <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 27abb6ab5674b8663440dc738a0ba79c185fb063)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../apache/spark/sql/execution/command/views.scala | 31 +++++----
.../scala/org/apache/spark/sql/SQLQuerySuite.scala | 78 ++++++++++++++++++++++
2 files changed, 97 insertions(+), 12 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 81f2c0f..960fe4a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -28,7 +28,7 @@ import
org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, Pe
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat,
CatalogTable, CatalogTableType, SessionCatalog, TemporaryViewRelation}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeReference, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project,
View, With}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
@@ -560,23 +560,30 @@ object ViewHelper {
private def collectTemporaryObjects(
catalog: SessionCatalog, child: LogicalPlan): (Seq[Seq[String]],
Seq[String]) = {
def collectTempViews(child: LogicalPlan): Seq[Seq[String]] = {
- child.collect {
+ child.flatMap {
case UnresolvedRelation(nameParts, _, _) if
catalog.isTempView(nameParts) =>
Seq(nameParts)
- case plan if !plan.resolved => plan.expressions.flatMap(_.collect {
+ case w: With if !w.resolved =>
w.innerChildren.flatMap(collectTempViews)
+ case plan if !plan.resolved => plan.expressions.flatMap(_.flatMap {
case e: SubqueryExpression => collectTempViews(e.plan)
- }).flatten
- }.flatten.distinct
+ case _ => Seq.empty
+ })
+ case _ => Seq.empty
+ }.distinct
}
def collectTempFunctions(child: LogicalPlan): Seq[String] = {
- child.collect {
- case plan if !plan.resolved => plan.expressions.flatMap(_.collect {
- case e: SubqueryExpression => collectTempFunctions(e.plan)
- case e: UnresolvedFunction if catalog.isTemporaryFunction(e.name) =>
- Seq(e.name.funcName)
- }).flatten
- }.flatten.distinct
+ child.flatMap {
+ case w: With if !w.resolved =>
w.innerChildren.flatMap(collectTempFunctions)
+ case plan if !plan.resolved =>
+ plan.expressions.flatMap(_.flatMap {
+ case e: SubqueryExpression => collectTempFunctions(e.plan)
+ case e: UnresolvedFunction if catalog.isTemporaryFunction(e.name)
=>
+ Seq(e.name.funcName)
+ case _ => Seq.empty
+ })
+ case _ => Seq.empty
+ }.distinct
}
(collectTempViews(child), collectTempFunctions(child))
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 0808b80..e42fb96 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -3848,6 +3848,84 @@ class SQLQuerySuite extends QueryTest with
SharedSparkSession with AdaptiveSpark
}
}
}
+
+ test("SPARK-34421: Resolve temporary objects in temporary views with CTEs") {
+ val tempFuncName = "temp_func"
+ withUserDefinedFunction(tempFuncName -> true) {
+ spark.udf.register(tempFuncName, identity[Int](_))
+
+ val tempViewName = "temp_view"
+ withTempView(tempViewName) {
+ sql(s"CREATE TEMPORARY VIEW $tempViewName AS SELECT 1")
+
+ val testViewName = "test_view"
+
+ withTempView(testViewName) {
+ sql(
+ s"""
+ |CREATE TEMPORARY VIEW $testViewName AS
+ |WITH cte AS (
+ | SELECT $tempFuncName(0)
+ |)
+ |SELECT * FROM cte
+ |""".stripMargin)
+ checkAnswer(sql(s"SELECT * FROM $testViewName"), Row(0))
+ }
+
+ withTempView(testViewName) {
+ sql(
+ s"""
+ |CREATE TEMPORARY VIEW $testViewName AS
+ |WITH cte AS (
+ | SELECT * FROM $tempViewName
+ |)
+ |SELECT * FROM cte
+ |""".stripMargin)
+ checkAnswer(sql(s"SELECT * FROM $testViewName"), Row(1))
+ }
+ }
+ }
+ }
+
+ test("SPARK-34421: Resolve temporary objects in permanent views with CTEs") {
+ val tempFuncName = "temp_func"
+ withUserDefinedFunction((tempFuncName, true)) {
+ spark.udf.register(tempFuncName, identity[Int](_))
+
+ val tempViewName = "temp_view"
+ withTempView(tempViewName) {
+ sql(s"CREATE TEMPORARY VIEW $tempViewName AS SELECT 1")
+
+ val testViewName = "test_view"
+
+ val e = intercept[AnalysisException] {
+ sql(
+ s"""
+ |CREATE VIEW $testViewName AS
+ |WITH cte AS (
+ | SELECT * FROM $tempViewName
+ |)
+ |SELECT * FROM cte
+ |""".stripMargin)
+ }
+ assert(e.message.contains("Not allowed to create a permanent view " +
+ s"`default`.`$testViewName` by referencing a temporary view
$tempViewName"))
+
+ val e2 = intercept[AnalysisException] {
+ sql(
+ s"""
+ |CREATE VIEW $testViewName AS
+ |WITH cte AS (
+ | SELECT $tempFuncName(0)
+ |)
+ |SELECT * FROM cte
+ |""".stripMargin)
+ }
+ assert(e2.message.contains("Not allowed to create a permanent view " +
+ s"`default`.`$testViewName` by referencing a temporary function
`$tempFuncName`"))
+ }
+ }
+ }
}
case class Foo(bar: Option[String])
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]