This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 8e97c5c [SPARK-30885][SQL][FOLLOW-UP] Fix issues where some V1
commands allow tables that are not fully qualified
8e97c5c is described below
commit 8e97c5c9fab37703aa98a4564884280884e2228b
Author: Terry Kim <[email protected]>
AuthorDate: Wed Mar 4 18:09:48 2020 +0800
[SPARK-30885][SQL][FOLLOW-UP] Fix issues where some V1 commands allow
tables that are not fully qualified
### What changes were proposed in this pull request?
There are few V1 commands such as `REFRESH TABLE` that still allow
`spark_catalog.t` because they run the commands with parsed table names without
trying to load them in the catalog. This PR addresses this issue.
The PR also addresses the issue brought up in
https://github.com/apache/spark/pull/27642#discussion_r382402104.
### Why are the changes needed?
To fix a bug where for some V1 commands, `spark_catalog.t` is allowed.
### Does this PR introduce any user-facing change?
Yes, a bug is fixed and `REFRESH TABLE spark_catalog.t` is not allowed.
### How was this patch tested?
Added new test.
Closes #27718 from imback82/fix_TempViewOrV1Table.
Authored-by: Terry Kim <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit b30278107fa34723f5a93b412b4ff21d6e3bbef0)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../sql/connector/catalog/LookupCatalog.scala | 7 ++++
.../catalyst/analysis/ResolveSessionCatalog.scala | 14 +------
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 44 +++++++++++++++++-----
.../execution/command/PlanResolutionSuite.scala | 42 +++++++++++----------
4 files changed, 65 insertions(+), 42 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala
index b0b9d7b..10c1574 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.connector.catalog
import org.apache.spark.internal.Logging
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
@@ -56,8 +57,14 @@ private[sql] trait LookupCatalog extends Logging {
* Extract session catalog and identifier from a multi-part identifier.
*/
object SessionCatalogAndIdentifier {
+ import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
+
def unapply(parts: Seq[String]): Option[(CatalogPlugin, Identifier)] =
parts match {
case CatalogAndIdentifier(catalog, ident) if
CatalogV2Util.isSessionCatalog(catalog) =>
+ if (ident.namespace.length != 1) {
+ throw new AnalysisException(
+ s"The namespace in session catalog must have exactly one name
part: ${parts.quoted}")
+ }
Some(catalog, ident)
case _ => None
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index baaa24f..7950b61 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -572,11 +572,6 @@ class ResolveSessionCatalog(
FunctionIdentifier(nameParts.head, None)
} else {
ident.namespace match {
- // For name parts like `spark_catalog.t`, we need to fill in the
default database so
- // that the caller side won't treat it as a temp function.
- case Array() if nameParts.head ==
CatalogManager.SESSION_CATALOG_NAME =>
- FunctionIdentifier(
- ident.name,
Some(catalogManager.v1SessionCatalog.getCurrentDatabase))
case Array(db) => FunctionIdentifier(ident.name, Some(db))
case _ =>
throw new AnalysisException(s"Unsupported function name
'$ident'")
@@ -642,14 +637,7 @@ class ResolveSessionCatalog(
object TempViewOrV1Table {
def unapply(nameParts: Seq[String]): Option[Seq[String]] = nameParts match
{
case _ if isTempView(nameParts) => Some(nameParts)
- case SessionCatalogAndTable(_, tbl) =>
- if (nameParts.head == CatalogManager.SESSION_CATALOG_NAME &&
tbl.length == 1) {
- // For name parts like `spark_catalog.t`, we need to fill in the
default database so
- // that the caller side won't treat it as a temp view.
- Some(Seq(catalogManager.v1SessionCatalog.getCurrentDatabase,
tbl.head))
- } else {
- Some(tbl)
- }
+ case SessionCatalogAndIdentifier(_, tbl) =>
Some(tbl.asMultipartIdentifier)
case _ => None
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index bccdce7..1fc0bb1 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -2090,7 +2090,8 @@ class DataSourceV2SQLSuite
val e1 = intercept[AnalysisException] {
sql("DESCRIBE FUNCTION default.ns1.ns2.fun")
}
- assert(e1.message.contains("Unsupported function name
'default.ns1.ns2.fun'"))
+ assert(e1.message.contains(
+ "The namespace in session catalog must have exactly one name part:
default.ns1.ns2.fun"))
}
test("SHOW FUNCTIONS not valid v1 namespace") {
@@ -2109,9 +2110,10 @@ class DataSourceV2SQLSuite
assert(e.message.contains("DROP FUNCTION is only supported in v1 catalog"))
val e1 = intercept[AnalysisException] {
- sql("DESCRIBE FUNCTION default.ns1.ns2.fun")
+ sql("DROP FUNCTION default.ns1.ns2.fun")
}
- assert(e1.message.contains("Unsupported function name
'default.ns1.ns2.fun'"))
+ assert(e1.message.contains(
+ "The namespace in session catalog must have exactly one name part:
default.ns1.ns2.fun"))
}
test("CREATE FUNCTION: only support session catalog") {
@@ -2123,7 +2125,8 @@ class DataSourceV2SQLSuite
val e1 = intercept[AnalysisException] {
sql("CREATE FUNCTION default.ns1.ns2.fun as 'f'")
}
- assert(e1.message.contains("Unsupported function name
'default.ns1.ns2.fun'"))
+ assert(e1.message.contains(
+ "The namespace in session catalog must have exactly one name part:
default.ns1.ns2.fun"))
}
test("global temp view should not be masked by v2 catalog") {
@@ -2160,7 +2163,8 @@ class DataSourceV2SQLSuite
// the session catalog, not the `gloabl_temp` v2 catalog.
sql(s"CREATE TABLE $globalTempDB.ns1.ns2.tbl (id bigint, data string)
USING json")
}
- assert(e.message.contains("global_temp.ns1.ns2.tbl is not a valid
TableIdentifier"))
+ assert(e.message.contains(
+ "The namespace in session catalog must have exactly one name part:
global_temp.ns1.ns2.tbl"))
}
test("table name same as catalog can be used") {
@@ -2187,10 +2191,31 @@ class DataSourceV2SQLSuite
def assertWrongTableIdent(): Unit = {
withTable("t") {
sql("CREATE TABLE t USING json AS SELECT 1 AS i")
- val e = intercept[AnalysisException] {
- sql("select * from spark_catalog.t")
+
+ val t = "spark_catalog.t"
+ def verify(sql: String): Unit = {
+ val e = intercept[AnalysisException](spark.sql(sql))
+ assert(e.message.contains(
+ s"The namespace in session catalog must have exactly one name
part: $t"))
}
- assert(e.message.contains("Table or view not found: spark_catalog.t"))
+
+ verify(s"select * from $t")
+ // Verify V1 commands that bypass table lookups.
+ verify(s"REFRESH TABLE $t")
+ verify(s"DESCRIBE $t i")
+ verify(s"DROP TABLE $t")
+ verify(s"DROP VIEW $t")
+ verify(s"ANALYZE TABLE $t COMPUTE STATISTICS")
+ verify(s"ANALYZE TABLE $t COMPUTE STATISTICS FOR ALL COLUMNS")
+ verify(s"MSCK REPAIR TABLE $t")
+ verify(s"LOAD DATA INPATH 'filepath' INTO TABLE $t")
+ verify(s"SHOW CREATE TABLE $t")
+ verify(s"SHOW CREATE TABLE $t AS SERDE")
+ verify(s"CACHE TABLE $t")
+ verify(s"UNCACHE TABLE $t")
+ verify(s"TRUNCATE TABLE $t")
+ verify(s"SHOW PARTITIONS $t")
+ verify(s"SHOW COLUMNS FROM $t")
}
}
@@ -2334,7 +2359,8 @@ class DataSourceV2SQLSuite
val e1 = intercept[AnalysisException](
sql(s"CACHE TABLE $sessionCatalogName.v")
)
- assert(e1.message.contains("Table or view not found: default.v"))
+ assert(e1.message.contains(
+ "The namespace in session catalog must have exactly one name part:
spark_catalog.v"))
}
val e2 = intercept[AnalysisException] {
sql(s"CREATE TEMP VIEW $sessionCatalogName.v AS SELECT 1")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 01d16ce..9df3c33 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -128,7 +128,7 @@ class PlanResolutionSuite extends AnalysisTest {
}
})
when(manager.currentCatalog).thenReturn(v2SessionCatalog)
- when(manager.currentNamespace).thenReturn(Array.empty[String])
+ when(manager.currentNamespace).thenReturn(Array("default"))
when(manager.v1SessionCatalog).thenReturn(v1SessionCatalog)
manager
}
@@ -171,7 +171,7 @@ class PlanResolutionSuite extends AnalysisTest {
"USING parquet PARTITIONED BY (a)"
val expectedTableDesc = CatalogTable(
- identifier = TableIdentifier("my_tab"),
+ identifier = TableIdentifier("my_tab", Some("default")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty,
schema = new StructType()
@@ -215,7 +215,7 @@ class PlanResolutionSuite extends AnalysisTest {
"CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS"
val expectedTableDesc = CatalogTable(
- identifier = TableIdentifier("my_tab"),
+ identifier = TableIdentifier("my_tab", Some("default")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty,
schema = new StructType().add("a", IntegerType).add("b", StringType),
@@ -236,7 +236,7 @@ class PlanResolutionSuite extends AnalysisTest {
val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet COMMENT
'abc'"
val expectedTableDesc = CatalogTable(
- identifier = TableIdentifier("my_tab"),
+ identifier = TableIdentifier("my_tab", Some("default")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty,
schema = new StructType().add("a", IntegerType).add("b", StringType),
@@ -256,7 +256,7 @@ class PlanResolutionSuite extends AnalysisTest {
val sql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet
TBLPROPERTIES('test' = 'test')"
val expectedTableDesc = CatalogTable(
- identifier = TableIdentifier("my_tab"),
+ identifier = TableIdentifier("my_tab", Some("default")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty,
schema = new StructType().add("a", IntegerType).add("b", StringType),
@@ -276,7 +276,7 @@ class PlanResolutionSuite extends AnalysisTest {
val v1 = "CREATE TABLE my_tab(a INT, b STRING) USING parquet LOCATION
'/tmp/file'"
val expectedTableDesc = CatalogTable(
- identifier = TableIdentifier("my_tab"),
+ identifier = TableIdentifier("my_tab", Some("default")),
tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat.empty.copy(locationUri = Some(new
URI("/tmp/file"))),
schema = new StructType().add("a", IntegerType).add("b", StringType),
@@ -330,7 +330,7 @@ class PlanResolutionSuite extends AnalysisTest {
""".stripMargin
val expectedTableDesc = CatalogTable(
- identifier = TableIdentifier("table_name"),
+ identifier = TableIdentifier("table_name", Some("default")),
tableType = CatalogTableType.MANAGED,
storage = CatalogStorageFormat.empty.copy(
properties = Map("a" -> "1", "b" -> "0.1", "c" -> "true")
@@ -621,7 +621,7 @@ class PlanResolutionSuite extends AnalysisTest {
val tableName1 = "db.tab"
val tableIdent1 = TableIdentifier("tab", Option("db"))
val tableName2 = "tab"
- val tableIdent2 = TableIdentifier("tab", None)
+ val tableIdent2 = TableIdentifier("tab", Some("default"))
parseResolveCompare(s"DROP TABLE $tableName1",
DropTableCommand(tableIdent1, ifExists = false, isView = false, purge =
false))
@@ -657,7 +657,7 @@ class PlanResolutionSuite extends AnalysisTest {
val viewName1 = "db.view"
val viewIdent1 = TableIdentifier("view", Option("db"))
val viewName2 = "view"
- val viewIdent2 = TableIdentifier("view")
+ val viewIdent2 = TableIdentifier("view", Option("default"))
parseResolveCompare(s"DROP VIEW $viewName1",
DropTableCommand(viewIdent1, ifExists = false, isView = true, purge =
false))
@@ -688,7 +688,7 @@ class PlanResolutionSuite extends AnalysisTest {
val parsed2_view = parseAndResolve(sql2_view)
val parsed3_view = parseAndResolve(sql3_view)
- val tableIdent = TableIdentifier("table_name", None)
+ val tableIdent = TableIdentifier("table_name", Some("default"))
val expected1_view = AlterTableSetPropertiesCommand(
tableIdent, Map("test" -> "test", "comment" -> "new_comment"), isView =
true)
val expected2_view = AlterTableUnsetPropertiesCommand(
@@ -715,8 +715,8 @@ class PlanResolutionSuite extends AnalysisTest {
val parsed2 = parseAndResolve(sql2)
val parsed3 = parseAndResolve(sql3)
- val tableIdent = TableIdentifier(tblName, None)
if (useV1Command) {
+ val tableIdent = TableIdentifier(tblName, Some("default"))
val expected1 = AlterTableSetPropertiesCommand(
tableIdent, Map("test" -> "test", "comment" -> "new_comment"),
isView = false)
val expected2 = AlterTableUnsetPropertiesCommand(
@@ -781,7 +781,7 @@ class PlanResolutionSuite extends AnalysisTest {
val parsed = parseAndResolve(sql)
if (useV1Command) {
val expected = AlterTableSetPropertiesCommand(
- TableIdentifier(tblName),
+ TableIdentifier(tblName, Some("default")),
Map("a" -> "1", "b" -> "0.1", "c" -> "true"),
isView = false)
@@ -806,7 +806,7 @@ class PlanResolutionSuite extends AnalysisTest {
val parsed = parseAndResolve(sql)
if (useV1Command) {
val expected = AlterTableSetLocationCommand(
- TableIdentifier(tblName, None),
+ TableIdentifier(tblName, Some("default")),
None,
"new location")
comparePlans(parsed, expected)
@@ -828,8 +828,10 @@ class PlanResolutionSuite extends AnalysisTest {
val parsed1 = parseAndResolve(sql1)
val parsed2 = parseAndResolve(sql2)
if (useV1Command) {
- val expected1 = DescribeTableCommand(TableIdentifier(tblName, None),
Map.empty, false)
- val expected2 = DescribeTableCommand(TableIdentifier(tblName, None),
Map.empty, true)
+ val expected1 = DescribeTableCommand(
+ TableIdentifier(tblName, Some("default")), Map.empty, false)
+ val expected2 = DescribeTableCommand(
+ TableIdentifier(tblName, Some("default")), Map.empty, true)
comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
@@ -851,7 +853,7 @@ class PlanResolutionSuite extends AnalysisTest {
val parsed3 = parseAndResolve(sql3)
if (useV1Command) {
val expected3 = DescribeTableCommand(
- TableIdentifier(tblName, None), Map("a" -> "1"), false)
+ TableIdentifier(tblName, Some("default")), Map("a" -> "1"), false)
comparePlans(parsed3, expected3)
} else {
parsed3 match {
@@ -1019,8 +1021,8 @@ class PlanResolutionSuite extends AnalysisTest {
val parsed1 = parseAndResolve(sql1)
val parsed2 = parseAndResolve(sql2)
- val tableIdent = TableIdentifier(tblName, None)
if (useV1Command) {
+ val tableIdent = TableIdentifier(tblName, Some("default"))
val oldColumn = StructField("i", IntegerType)
val newColumn = StructField("i", LongType)
val expected1 = AlterTableChangeColumnCommand(
@@ -1093,7 +1095,7 @@ class PlanResolutionSuite extends AnalysisTest {
} else {
val actual = parseAndResolve(sql)
val expected = AlterTableChangeColumnCommand(
- TableIdentifier(tblName, None),
+ TableIdentifier(tblName, Some("default")),
"I",
StructField("I", IntegerType).withComment("new comment"))
comparePlans(actual, expected)
@@ -1125,7 +1127,7 @@ class PlanResolutionSuite extends AnalysisTest {
}
val DSV2ResolutionTests = {
- val v2SessionCatalogTable =
s"${CatalogManager.SESSION_CATALOG_NAME}.v2Table"
+ val v2SessionCatalogTable =
s"${CatalogManager.SESSION_CATALOG_NAME}.default.v2Table"
Seq(
("ALTER TABLE testcat.tab ALTER COLUMN i TYPE bigint", false),
("ALTER TABLE tab ALTER COLUMN i TYPE bigint", false),
@@ -1141,7 +1143,7 @@ class PlanResolutionSuite extends AnalysisTest {
(s"SHOW TBLPROPERTIES $v2SessionCatalogTable", true),
("SELECT * from tab", false),
("SELECT * from testcat.tab", false),
- (s"SELECT * from ${CatalogManager.SESSION_CATALOG_NAME}.v2Table", true)
+ (s"SELECT * from $v2SessionCatalogTable", true)
)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]