This is an automated email from the ASF dual-hosted git repository.
yamamuro pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 25d7219 [SPARK-34719][SQL][3.0] Correctly resolve the view query with
duplicated column names
25d7219 is described below
commit 25d72191de7c842aa2acd4b7307ba8e6585dd182
Author: Wenchen Fan <[email protected]>
AuthorDate: Sat Mar 20 11:09:50 2021 +0900
[SPARK-34719][SQL][3.0] Correctly resolve the view query with duplicated
column names
backport https://github.com/apache/spark/pull/31811 to 3.0
### What changes were proposed in this pull request?
For permanent views (and the new SQL temp view in Spark 3.1), we store the
view SQL text and re-parse/analyze the view SQL text when reading the view. In
the case of `SELECT * FROM ...`, we want to avoid view schema change (e.g. the
referenced table changes its schema) and will record the view query output
column names when creating the view, so that when reading the view we can add a
`SELECT recorded_column_names FROM ...` to retain the original view query
schema.
In Spark 3.1 and before, the final SELECT is added after the analysis
phase:
https://github.com/apache/spark/blob/branch-3.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala#L67
If the view query has duplicated output column names, we always pick the
first column when reading a view. A simple repro:
```
scala> sql("create view c(x, y) as select 1 a, 2 a")
res0: org.apache.spark.sql.DataFrame = []
scala> sql("select * from c").show
+---+---+
| x| y|
+---+---+
| 1| 1|
+---+---+
```
In the master branch, we will fail at the view reading time due to
https://github.com/apache/spark/commit/b891862fb6b740b103d5a09530626ee4e0e8f6e3
, which adds the final SELECT during analysis, so that the query fails with
`Reference 'a' is ambiguous`
This PR proposes to resolve the view query output column names from the
matching attributes by ordinal.
For example, `create view c(x, y) as select 1 a, 2 a`, the view query
output column names are `[a, a]`. When we reading the view, there are 2
matching attributes (e.g.`[a#1, a#2]`) and we can simply match them by ordinal.
A negative example is
```
create table t(a int)
create view v as select *, 1 as col from t
replace table t(a int, col int)
```
When reading the view, the view query output column names are `[a, col]`,
and there are two matching attributes of `col`, and we should fail the query.
See the tests for details.
### Why are the changes needed?
bug fix
### Does this PR introduce _any_ user-facing change?
yes
### How was this patch tested?
new test
Closes #31894 from cloud-fan/backport.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Takeshi Yamamuro <[email protected]>
---
.../apache/spark/sql/catalyst/analysis/view.scala | 44 ++++++++++++++++++---
.../apache/spark/sql/execution/SQLViewSuite.scala | 45 +++++++++++++++++++++-
2 files changed, 82 insertions(+), 7 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
index 6560164..013a303 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/view.scala
@@ -17,7 +17,10 @@
package org.apache.spark.sql.catalyst.analysis
-import org.apache.spark.sql.catalyst.expressions.Alias
+import java.util.Locale
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf
@@ -60,15 +63,44 @@ object EliminateView extends Rule[LogicalPlan] with
CastSupport {
// The child has the different output attributes with the View operator.
Adds a Project over
// the child of the view.
case v @ View(desc, output, child) if child.resolved &&
!v.sameOutput(child) =>
+ // Use the stored view query output column names to find the matching
attributes. The column
+ // names may have duplication, e.g. `CREATE VIEW v(x, y) AS SELECT 1
col, 2 col`. We need to
+ // make sure the that matching attributes have the same number of
duplications, and pick the
+ // corresponding attribute by ordinal.
val resolver = conf.resolver
val queryColumnNames = desc.viewQueryColumnNames
val queryOutput = if (queryColumnNames.nonEmpty) {
- // Find the attribute that has the expected attribute name from an
attribute list, the names
- // are compared using conf.resolver.
- // `CheckAnalysis` already guarantees the expected attribute can be
found for sure.
- desc.viewQueryColumnNames.map { colName =>
- child.output.find(attr => resolver(attr.name, colName)).get
+ val normalizeColName: String => String = if
(conf.caseSensitiveAnalysis) {
+ identity
+ } else {
+ _.toLowerCase(Locale.ROOT)
+ }
+ val nameToCounts = scala.collection.mutable.HashMap.empty[String, Int]
+ val nameToMatchedCols = scala.collection.mutable.HashMap.empty[String,
Seq[Attribute]]
+
+ val outputAttrs = queryColumnNames.map { colName =>
+ val normalized = normalizeColName(colName)
+ val count = nameToCounts.getOrElse(normalized, 0)
+ val matchedCols = nameToMatchedCols.getOrElseUpdate(
+ normalized, child.output.filter(attr => resolver(attr.name,
colName)))
+ if (matchedCols.length - 1 < count) {
+ throw new AnalysisException(s"The SQL query of view
${desc.identifier} has an " +
+ s"incompatible schema change and column $colName cannot be
resolved. Expect " +
+ s"more attributes named $colName in ${child.output.mkString("[",
",", "]")}")
+ }
+ nameToCounts(normalized) = count + 1
+ matchedCols(count)
}
+
+ nameToCounts.foreach { case (colName, count) =>
+ if (count > 1 && nameToMatchedCols(colName).length != count) {
+ throw new AnalysisException(s"The SQL query of view
${desc.identifier} has an " +
+ s"incompatible schema change and column $colName cannot be
resolved. Expect " +
+ s"less attributes named $colName in ${child.output.mkString("[",
",", "]")}")
+ }
+ }
+
+ outputAttrs
} else {
// For view created before Spark 2.2.0, the view text is already fully
qualified, the plan
// output is the same with the view output.
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
index 0a0c3f5..68d9460 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-import org.apache.spark.sql.internal.SQLConf.MAX_NESTED_VIEW_DEPTH
+import org.apache.spark.sql.internal.SQLConf.{CASE_SENSITIVE,
MAX_NESTED_VIEW_DEPTH}
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
class SimpleSQLViewSuite extends SQLViewSuite with SharedSparkSession
@@ -721,6 +721,7 @@ abstract class SQLViewSuite extends QueryTest with
SQLTestUtils {
sql("CREATE DATABASE IF NOT EXISTS db2")
sql("USE db2")
checkAnswer(spark.table("default.v1"), Row(1))
+ sql("USE default")
}
}
}
@@ -752,4 +753,46 @@ abstract class SQLViewSuite extends QueryTest with
SQLTestUtils {
checkAnswer(sql(s"SELECT * FROM $globalTempDB.testView"), Row(2))
}
}
+
+ test("SPARK-34719: view query with duplicated output column names") {
+ Seq(true, false).foreach { caseSensitive =>
+ withSQLConf(CASE_SENSITIVE.key -> caseSensitive.toString) {
+ withView("v1", "v2") {
+ sql("CREATE VIEW v1 AS SELECT 1 a, 2 b")
+ sql("CREATE VIEW v2 AS SELECT 1 col")
+
+ sql("CREATE VIEW testView(c1, c2, c3, c4) AS SELECT *, 1 col, 2 col
FROM v1")
+ withView("testView") {
+ checkAnswer(spark.table("testView"), Seq(Row(1, 2, 1, 2)))
+
+ // One more duplicated column `COL` if caseSensitive=false.
+ sql("CREATE OR REPLACE VIEW v1 AS SELECT 1 a, 2 b, 3 COL")
+ if (caseSensitive) {
+ checkAnswer(spark.table("testView"), Seq(Row(1, 2, 1, 2)))
+ } else {
+ val e =
intercept[AnalysisException](spark.table("testView").collect())
+ assert(e.message.contains("incompatible schema change"))
+ }
+ }
+
+ // v1 has 3 columns [a, b, COL], v2 has one column [col], so
`testView2` has duplicated
+ // output column names if caseSensitive=false.
+ sql("CREATE VIEW testView2(c1, c2, c3, c4) AS SELECT * FROM v1, v2")
+ withView("testView2") {
+ checkAnswer(spark.table("testView2"), Seq(Row(1, 2, 3, 1)))
+
+ // One less duplicated column if caseSensitive=false.
+ sql("CREATE OR REPLACE VIEW v1 AS SELECT 1 a, 2 b")
+ if (caseSensitive) {
+ val e =
intercept[AnalysisException](spark.table("testView2").collect())
+ assert(e.message.contains("'COL' is not found in '(a,b,col)'"))
+ } else {
+ val e =
intercept[AnalysisException](spark.table("testView2").collect())
+ assert(e.message.contains("incompatible schema change"))
+ }
+ }
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]