Repository: spark
Updated Branches:
refs/heads/master e164a04b2 -> 3fc456694
[SPARK-16678][SPARK-16677][SQL] Fix two View-related bugs
## What changes were proposed in this pull request?
**Issue 1: Disallow Creating/Altering a View when the same-name Table Exists
(without IF NOT EXISTS)**
When we create OR alter a view, we check whether the view already exists. In
the current implementation, if a table with the same name exists, we treat it
as a view. However, this is not the right behavior. We should follow what Hive
does. For example,
```
hive> CREATE TABLE tab1 (id int);
OK
Time taken: 0.196 seconds
hive> CREATE OR REPLACE VIEW tab1 AS SELECT * FROM t1;
FAILED: SemanticException [Error 10218]: Existing table is not a view
The following is an existing table, not a view: default.tab1
hive> ALTER VIEW tab1 AS SELECT * FROM t1;
FAILED: SemanticException [Error 10218]: Existing table is not a view
The following is an existing table, not a view: default.tab1
hive> CREATE VIEW IF NOT EXISTS tab1 AS SELECT * FROM t1;
OK
Time taken: 0.678 seconds
```
**Issue 2: Strange Error when Issuing Load Table Against A View**
Users should not be allowed to issue LOAD DATA against a view. Currently, when
users doing it, we got a very strange runtime error. For example,
```SQL
LOAD DATA LOCAL INPATH "$testData" INTO TABLE $viewName
```
```
java.lang.reflect.InvocationTargetException was thrown.
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.sql.hive.client.Shim_v0_14.loadTable(HiveShim.scala:680)
```
## How was this patch tested?
Added test cases
Author: gatorsmile <[email protected]>
Closes #14314 from gatorsmile/tableDDLAgainstView.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3fc45669
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3fc45669
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3fc45669
Branch: refs/heads/master
Commit: 3fc456694151e766c551b4bc58ed7c9457777666
Parents: e164a04
Author: gatorsmile <[email protected]>
Authored: Tue Jul 26 09:32:29 2016 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Tue Jul 26 09:32:29 2016 +0800
----------------------------------------------------------------------
.../sql/catalyst/catalog/SessionCatalog.scala | 6 +-
.../spark/sql/execution/command/tables.scala | 33 ++++-----
.../spark/sql/execution/command/views.scala | 5 ++
.../spark/sql/hive/execution/SQLViewSuite.scala | 71 +++++++++++++++++++-
4 files changed, 96 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/3fc45669/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 134fc4e..1856dc4 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -443,12 +443,12 @@ class SessionCatalog(
}
/**
- * Return whether a table with the specified name exists.
+ * Return whether a table/view with the specified name exists.
*
- * Note: If a database is explicitly specified, then this will return
whether the table
+ * Note: If a database is explicitly specified, then this will return
whether the table/view
* exists in that particular database instead. In that case, even if there
is a temporary
* table with the same name, we will return false if the specified database
does not
- * contain the table.
+ * contain the table/view.
*/
def tableExists(name: TableIdentifier): Boolean = synchronized {
val db = formatDatabaseName(name.database.getOrElse(currentDb))
http://git-wip-us.apache.org/repos/asf/spark/blob/3fc45669/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 8f3adad..c6daa95 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -202,35 +202,38 @@ case class LoadDataCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
if (!catalog.tableExists(table)) {
- throw new AnalysisException(s"Target table in LOAD DATA does not exist:
'$table'")
+ throw new AnalysisException(s"Target table in LOAD DATA does not exist:
$table")
}
val targetTable = catalog.getTableMetadataOption(table).getOrElse {
- throw new AnalysisException(s"Target table in LOAD DATA cannot be
temporary: '$table'")
+ throw new AnalysisException(s"Target table in LOAD DATA cannot be
temporary: $table")
+ }
+ if (targetTable.tableType == CatalogTableType.VIEW) {
+ throw new AnalysisException(s"Target table in LOAD DATA cannot be a
view: $table")
}
if (DDLUtils.isDatasourceTable(targetTable)) {
- throw new AnalysisException(s"LOAD DATA is not supported for datasource
tables: '$table'")
+ throw new AnalysisException(s"LOAD DATA is not supported for datasource
tables: $table")
}
if (targetTable.partitionColumnNames.nonEmpty) {
if (partition.isEmpty) {
- throw new AnalysisException(s"LOAD DATA target table '$table' is
partitioned, " +
+ throw new AnalysisException(s"LOAD DATA target table $table is
partitioned, " +
s"but no partition spec is provided")
}
if (targetTable.partitionColumnNames.size != partition.get.size) {
- throw new AnalysisException(s"LOAD DATA target table '$table' is
partitioned, " +
+ throw new AnalysisException(s"LOAD DATA target table $table is
partitioned, " +
s"but number of columns in provided partition spec
(${partition.get.size}) " +
s"do not match number of partitioned columns in table " +
s"(s${targetTable.partitionColumnNames.size})")
}
partition.get.keys.foreach { colName =>
if (!targetTable.partitionColumnNames.contains(colName)) {
- throw new AnalysisException(s"LOAD DATA target table '$table' is
partitioned, " +
+ throw new AnalysisException(s"LOAD DATA target table $table is
partitioned, " +
s"but the specified partition spec refers to a column that is not
partitioned: " +
s"'$colName'")
}
}
} else {
if (partition.nonEmpty) {
- throw new AnalysisException(s"LOAD DATA target table '$table' is not
partitioned, " +
+ throw new AnalysisException(s"LOAD DATA target table $table is not
partitioned, " +
s"but a partition spec was provided.")
}
}
@@ -321,31 +324,31 @@ case class TruncateTableCommand(
override def run(spark: SparkSession): Seq[Row] = {
val catalog = spark.sessionState.catalog
if (!catalog.tableExists(tableName)) {
- throw new AnalysisException(s"Table '$tableName' in TRUNCATE TABLE does
not exist.")
+ throw new AnalysisException(s"Table $tableName in TRUNCATE TABLE does
not exist.")
}
if (catalog.isTemporaryTable(tableName)) {
throw new AnalysisException(
- s"Operation not allowed: TRUNCATE TABLE on temporary tables:
'$tableName'")
+ s"Operation not allowed: TRUNCATE TABLE on temporary tables:
$tableName")
}
val table = catalog.getTableMetadata(tableName)
if (table.tableType == CatalogTableType.EXTERNAL) {
throw new AnalysisException(
- s"Operation not allowed: TRUNCATE TABLE on external tables:
'$tableName'")
+ s"Operation not allowed: TRUNCATE TABLE on external tables:
$tableName")
}
if (table.tableType == CatalogTableType.VIEW) {
throw new AnalysisException(
- s"Operation not allowed: TRUNCATE TABLE on views: '$tableName'")
+ s"Operation not allowed: TRUNCATE TABLE on views: $tableName")
}
val isDatasourceTable = DDLUtils.isDatasourceTable(table)
if (isDatasourceTable && partitionSpec.isDefined) {
throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported
" +
- s"for tables created using the data sources API: '$tableName'")
+ s"for tables created using the data sources API: $tableName")
}
if (table.partitionColumnNames.isEmpty && partitionSpec.isDefined) {
throw new AnalysisException(
s"Operation not allowed: TRUNCATE TABLE ... PARTITION is not supported
" +
- s"for tables that are not partitioned: '$tableName'")
+ s"for tables that are not partitioned: $tableName")
}
val locations =
if (isDatasourceTable) {
@@ -366,7 +369,7 @@ case class TruncateTableCommand(
} catch {
case NonFatal(e) =>
throw new AnalysisException(
- s"Failed to truncate table '$tableName' when removing data of
the path: $path " +
+ s"Failed to truncate table $tableName when removing data of the
path: $path " +
s"because of ${e.toString}")
}
}
@@ -379,7 +382,7 @@ case class TruncateTableCommand(
spark.sharedState.cacheManager.uncacheQuery(spark.table(tableName.quotedString))
} catch {
case NonFatal(e) =>
- log.warn(s"Exception when attempting to uncache table '$tableName'", e)
+ log.warn(s"Exception when attempting to uncache table $tableName", e)
}
Seq.empty[Row]
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3fc45669/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 312a1f6..901a9b9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -113,9 +113,14 @@ case class CreateViewCommand(
val qualifiedName = name.copy(database = Option(database))
if (sessionState.catalog.tableExists(qualifiedName)) {
+ val tableMetadata =
sessionState.catalog.getTableMetadata(qualifiedName)
if (allowExisting) {
// Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does
nothing when the target view
// already exists.
+ } else if (tableMetadata.tableType != CatalogTableType.VIEW) {
+ throw new AnalysisException(
+ "Existing table is not a view. The following is an existing table,
" +
+ s"not a view: $qualifiedName")
} else if (replace) {
// Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
sessionState.catalog.alterTable(prepareTable(sparkSession,
analyzedPlan))
http://git-wip-us.apache.org/repos/asf/spark/blob/3fc45669/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
index 82dc64a..6a80664 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.hive.test.TestHiveSingleton
-import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
/**
@@ -55,6 +54,76 @@ class SQLViewSuite extends QueryTest with SQLTestUtils with
TestHiveSingleton {
}
}
+ test("error handling: existing a table with the duplicate name when
creating/altering a view") {
+ withTable("tab1") {
+ sql("CREATE TABLE tab1 (id int)")
+ var e = intercept[AnalysisException] {
+ sql("CREATE OR REPLACE VIEW tab1 AS SELECT * FROM jt")
+ }.getMessage
+ assert(e.contains("The following is an existing table, not a view:
`default`.`tab1`"))
+ e = intercept[AnalysisException] {
+ sql("CREATE VIEW tab1 AS SELECT * FROM jt")
+ }.getMessage
+ assert(e.contains("The following is an existing table, not a view:
`default`.`tab1`"))
+ e = intercept[AnalysisException] {
+ sql("ALTER VIEW tab1 AS SELECT * FROM jt")
+ }.getMessage
+ assert(e.contains("The following is an existing table, not a view:
`default`.`tab1`"))
+ }
+ }
+
+ test("existing a table with the duplicate name when CREATE VIEW IF NOT
EXISTS") {
+ withTable("tab1") {
+ sql("CREATE TABLE tab1 (id int)")
+ sql("CREATE VIEW IF NOT EXISTS tab1 AS SELECT * FROM jt")
+ checkAnswer(sql("select count(*) FROM tab1"), Row(0))
+ }
+ }
+
+ test("error handling: insert/load/truncate table commands against a temp
view") {
+ val viewName = "testView"
+ withTempView(viewName) {
+ sql(s"CREATE TEMPORARY VIEW $viewName AS SELECT id FROM jt")
+ var e = intercept[AnalysisException] {
+ sql(s"INSERT INTO TABLE $viewName SELECT 1")
+ }.getMessage
+ assert(e.contains("Inserting into an RDD-based table is not allowed"))
+
+ val testData =
hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath
+ e = intercept[AnalysisException] {
+ sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE $viewName""")
+ }.getMessage
+ assert(e.contains(s"Target table in LOAD DATA cannot be temporary:
`$viewName`"))
+
+ e = intercept[AnalysisException] {
+ sql(s"TRUNCATE TABLE $viewName")
+ }.getMessage
+ assert(e.contains(s"Operation not allowed: TRUNCATE TABLE on temporary
tables: `$viewName`"))
+ }
+ }
+
+ test("error handling: insert/load/truncate table commands against a view") {
+ val viewName = "testView"
+ withView(viewName) {
+ sql(s"CREATE VIEW $viewName AS SELECT id FROM jt")
+ var e = intercept[AnalysisException] {
+ sql(s"INSERT INTO TABLE $viewName SELECT 1")
+ }.getMessage
+ assert(e.contains("Inserting into an RDD-based table is not allowed"))
+
+ val testData =
hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath
+ e = intercept[AnalysisException] {
+ sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE $viewName""")
+ }.getMessage
+ assert(e.contains(s"Target table in LOAD DATA cannot be a view:
`$viewName`"))
+
+ e = intercept[AnalysisException] {
+ sql(s"TRUNCATE TABLE $viewName")
+ }.getMessage
+ assert(e.contains(s"Operation not allowed: TRUNCATE TABLE on views:
`$viewName`"))
+ }
+ }
+
test("error handling: fail if the view sql itself is invalid") {
// A table that does not exist
intercept[AnalysisException] {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]