Repository: spark Updated Branches: refs/heads/master 671bc08ed -> 3755da76c
[SPARK-19331][SQL][TESTS] Improve the test coverage of SQLViewSuite Move `SQLViewSuite` from `sql/hive` to `sql/core`, so we can test the view supports without hive metastore. Also moved the test cases that specified to hive to `HiveSQLViewSuite`. Improve the test coverage of SQLViewSuite, cover the following cases: 1. view resolution(possibly a referenced table/view have changed after the view creation); 2. handle a view with user specified column names; 3. improve the test cases for a nested view. Also added a test case for cyclic view reference, which is a known issue that is not fixed yet. N/A Author: jiangxingbo <jiangxb1...@gmail.com> Closes #16674 from jiangxb1987/view-test. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3755da76 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3755da76 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3755da76 Branch: refs/heads/master Commit: 3755da76c3821e8e6a4f359c43243a51a06559ca Parents: 671bc08 Author: jiangxingbo <jiangxb1...@gmail.com> Authored: Wed Feb 15 10:46:54 2017 -0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Wed Feb 15 10:47:11 2017 -0800 ---------------------------------------------------------------------- .../spark/sql/catalyst/analysis/Analyzer.scala | 11 +- .../catalyst/analysis/NoSuchItemException.scala | 2 +- .../spark/sql/execution/SQLViewSuite.scala | 620 +++++++++++++++ .../spark/sql/test/SharedSQLContext.scala | 11 +- .../sql/hive/execution/HiveSQLViewSuite.scala | 140 ++++ .../spark/sql/hive/execution/SQLViewSuite.scala | 762 ------------------- 6 files changed, 773 insertions(+), 773 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/3755da76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 6aa0e8d..cd517a9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -620,13 +620,18 @@ class Analyzer( private def lookupTableFromCatalog( u: UnresolvedRelation, defaultDatabase: Option[String] = None): LogicalPlan = { + val tableIdentWithDb = u.tableIdentifier.copy( + database = u.tableIdentifier.database.orElse(defaultDatabase)) try { - val tableIdentWithDb = u.tableIdentifier.copy( - database = u.tableIdentifier.database.orElse(defaultDatabase)) catalog.lookupRelation(tableIdentWithDb, u.alias) } catch { case _: NoSuchTableException => - u.failAnalysis(s"Table or view not found: ${u.tableName}") + u.failAnalysis(s"Table or view not found: ${tableIdentWithDb.unquotedString}") + // If the database is defined and that database is not found, throw an AnalysisException. + // Note that if the database is not defined, it is possible we are looking up a temp view. + case e: NoSuchDatabaseException => + u.failAnalysis(s"Table or view not found: ${tableIdentWithDb.unquotedString}, the " + + s"database ${e.db} doesn't exsits.") } } http://git-wip-us.apache.org/repos/asf/spark/blob/3755da76/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala index 8febdca..f5aae60 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec * Thrown by a catalog when an item cannot be found. The analyzer will rethrow the exception * as an [[org.apache.spark.sql.AnalysisException]] with the correct position information. */ -class NoSuchDatabaseException(db: String) extends AnalysisException(s"Database '$db' not found") +class NoSuchDatabaseException(val db: String) extends AnalysisException(s"Database '$db' not found") class NoSuchTableException(db: String, table: String) extends AnalysisException(s"Table or view '$table' not found in database '$db'") http://git-wip-us.apache.org/repos/asf/spark/blob/3755da76/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala new file mode 100644 index 0000000..2d95cb6 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala @@ -0,0 +1,620 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils} + +class SimpleSQLViewSuite extends SQLViewSuite with SharedSQLContext + +/** + * A suite for testing view related functionality. + */ +abstract class SQLViewSuite extends QueryTest with SQLTestUtils { + import testImplicits._ + + protected override def beforeAll(): Unit = { + super.beforeAll() + // Create a simple table with two columns: id and id1 + spark.range(1, 10).selectExpr("id", "id id1").write.format("json").saveAsTable("jt") + } + + protected override def afterAll(): Unit = { + try { + spark.sql(s"DROP TABLE IF EXISTS jt") + } finally { + super.afterAll() + } + } + + test("create a permanent view on a permanent view") { + withView("jtv1", "jtv2") { + sql("CREATE VIEW jtv1 AS SELECT * FROM jt WHERE id > 3") + sql("CREATE VIEW jtv2 AS SELECT * FROM jtv1 WHERE id < 6") + checkAnswer(sql("select count(*) FROM jtv2"), Row(2)) + } + } + + test("create a temp view on a permanent view") { + withView("jtv1", "temp_jtv1") { + sql("CREATE VIEW jtv1 AS SELECT * FROM jt WHERE id > 3") + sql("CREATE TEMPORARY VIEW temp_jtv1 AS SELECT * FROM jtv1 WHERE id < 6") + checkAnswer(sql("select count(*) FROM temp_jtv1"), Row(2)) + } + } + + test("create a temp view on a temp view") { + withView("temp_jtv1", "temp_jtv2") { + sql("CREATE TEMPORARY VIEW temp_jtv1 AS SELECT * FROM jt WHERE id > 3") + sql("CREATE TEMPORARY VIEW temp_jtv2 AS SELECT * FROM temp_jtv1 WHERE id < 6") + checkAnswer(sql("select count(*) FROM temp_jtv2"), Row(2)) + } + } + + test("create a permanent view on a temp view") { + withView("jtv1", "temp_jtv1", "global_temp_jtv1") { + sql("CREATE TEMPORARY VIEW temp_jtv1 AS SELECT * FROM jt WHERE id > 3") + var e = intercept[AnalysisException] { + sql("CREATE VIEW jtv1 AS SELECT * FROM temp_jtv1 WHERE id < 6") + }.getMessage + assert(e.contains("Not allowed to create a permanent view `jtv1` by " + + "referencing a temporary view `temp_jtv1`")) + + val globalTempDB = spark.sharedState.globalTempViewManager.database + sql("CREATE GLOBAL TEMP VIEW global_temp_jtv1 AS SELECT * FROM jt WHERE id > 0") + e = intercept[AnalysisException] { + sql(s"CREATE VIEW jtv1 AS SELECT * FROM $globalTempDB.global_temp_jtv1 WHERE id < 6") + }.getMessage + assert(e.contains(s"Not allowed to create a permanent view `jtv1` by referencing " + + s"a temporary view `global_temp`.`global_temp_jtv1`")) + } + } + + test("error handling: existing a table with the duplicate name when creating/altering a view") { + withTable("tab1") { + sql("CREATE TABLE tab1 (id int) USING parquet") + var e = intercept[AnalysisException] { + sql("CREATE OR REPLACE VIEW tab1 AS SELECT * FROM jt") + }.getMessage + assert(e.contains("`tab1` is not a view")) + e = intercept[AnalysisException] { + sql("CREATE VIEW tab1 AS SELECT * FROM jt") + }.getMessage + assert(e.contains("`tab1` is not a view")) + e = intercept[AnalysisException] { + sql("ALTER VIEW tab1 AS SELECT * FROM jt") + }.getMessage + assert(e.contains("`tab1` is not a view")) + } + } + + test("existing a table with the duplicate name when CREATE VIEW IF NOT EXISTS") { + withTable("tab1") { + sql("CREATE TABLE tab1 (id int) USING parquet") + sql("CREATE VIEW IF NOT EXISTS tab1 AS SELECT * FROM jt") + checkAnswer(sql("select count(*) FROM tab1"), Row(0)) + } + } + + test("Issue exceptions for ALTER VIEW on the temporary view") { + val viewName = "testView" + withTempView(viewName) { + spark.range(10).createTempView(viewName) + assertNoSuchTable(s"ALTER VIEW $viewName SET TBLPROPERTIES ('p' = 'an')") + assertNoSuchTable(s"ALTER VIEW $viewName UNSET TBLPROPERTIES ('p')") + } + } + + test("Issue exceptions for ALTER TABLE on the temporary view") { + val viewName = "testView" + withTempView(viewName) { + spark.range(10).createTempView(viewName) + assertNoSuchTable(s"ALTER TABLE $viewName SET SERDE 'whatever'") + assertNoSuchTable(s"ALTER TABLE $viewName PARTITION (a=1, b=2) SET SERDE 'whatever'") + assertNoSuchTable(s"ALTER TABLE $viewName SET SERDEPROPERTIES ('p' = 'an')") + assertNoSuchTable(s"ALTER TABLE $viewName SET LOCATION '/path/to/your/lovely/heart'") + assertNoSuchTable(s"ALTER TABLE $viewName PARTITION (a='4') SET LOCATION '/path/to/home'") + assertNoSuchTable(s"ALTER TABLE $viewName ADD IF NOT EXISTS PARTITION (a='4', b='8')") + assertNoSuchTable(s"ALTER TABLE $viewName DROP PARTITION (a='4', b='8')") + assertNoSuchTable(s"ALTER TABLE $viewName PARTITION (a='4') RENAME TO PARTITION (a='5')") + assertNoSuchTable(s"ALTER TABLE $viewName RECOVER PARTITIONS") + } + } + + test("Issue exceptions for other table DDL on the temporary view") { + val viewName = "testView" + withTempView(viewName) { + spark.range(10).createTempView(viewName) + + val e = intercept[AnalysisException] { + sql(s"INSERT INTO TABLE $viewName SELECT 1") + }.getMessage + assert(e.contains("Inserting into an RDD-based table is not allowed")) + + val dataFilePath = + Thread.currentThread().getContextClassLoader.getResource("data/files/employee.dat") + assertNoSuchTable(s"""LOAD DATA LOCAL INPATH "$dataFilePath" INTO TABLE $viewName""") + assertNoSuchTable(s"TRUNCATE TABLE $viewName") + assertNoSuchTable(s"SHOW CREATE TABLE $viewName") + assertNoSuchTable(s"SHOW PARTITIONS $viewName") + assertNoSuchTable(s"ANALYZE TABLE $viewName COMPUTE STATISTICS") + assertNoSuchTable(s"ANALYZE TABLE $viewName COMPUTE STATISTICS FOR COLUMNS id") + } + } + + private def assertNoSuchTable(query: String): Unit = { + intercept[NoSuchTableException] { + sql(query) + } + } + + test("error handling: insert/load/truncate table commands against a view") { + val viewName = "testView" + withView(viewName) { + sql(s"CREATE VIEW $viewName AS SELECT id FROM jt") + var e = intercept[AnalysisException] { + sql(s"INSERT INTO TABLE $viewName SELECT 1") + }.getMessage + assert(e.contains("Inserting into an RDD-based table is not allowed")) + + val dataFilePath = + Thread.currentThread().getContextClassLoader.getResource("data/files/employee.dat") + e = intercept[AnalysisException] { + sql(s"""LOAD DATA LOCAL INPATH "$dataFilePath" INTO TABLE $viewName""") + }.getMessage + assert(e.contains(s"Target table in LOAD DATA cannot be a view: `default`.`testview`")) + + e = intercept[AnalysisException] { + sql(s"TRUNCATE TABLE $viewName") + }.getMessage + assert(e.contains(s"Operation not allowed: TRUNCATE TABLE on views: `default`.`testview`")) + } + } + + test("error handling: fail if the view sql itself is invalid") { + // A database that does not exist + assertInvalidReference("CREATE OR REPLACE VIEW myabcdview AS SELECT * FROM db_not_exist234.jt") + + // A table that does not exist + assertInvalidReference("CREATE OR REPLACE VIEW myabcdview AS SELECT * FROM table_not_exist345") + + // A column that does not exist + intercept[AnalysisException] { + sql("CREATE OR REPLACE VIEW myabcdview AS SELECT random1234 FROM jt").collect() + } + } + + private def assertInvalidReference(query: String): Unit = { + val e = intercept[AnalysisException] { + sql(query) + }.getMessage + assert(e.contains("Table or view not found")) + } + + + test("error handling: fail if the temp view name contains the database prefix") { + // Fully qualified table name like "database.table" is not allowed for temporary view + val e = intercept[AnalysisException] { + sql("CREATE OR REPLACE TEMPORARY VIEW default.myabcdview AS SELECT * FROM jt") + } + assert(e.message.contains("It is not allowed to add database prefix")) + } + + test("error handling: disallow IF NOT EXISTS for CREATE TEMPORARY VIEW") { + val e = intercept[AnalysisException] { + sql("CREATE TEMPORARY VIEW IF NOT EXISTS myabcdview AS SELECT * FROM jt") + } + assert(e.message.contains("It is not allowed to define a TEMPORARY view with IF NOT EXISTS")) + } + + test("error handling: fail if the temp view sql itself is invalid") { + // A database that does not exist + assertInvalidReference( + "CREATE OR REPLACE TEMPORARY VIEW myabcdview AS SELECT * FROM db_not_exist234.jt") + + // A table that does not exist + assertInvalidReference( + "CREATE OR REPLACE TEMPORARY VIEW myabcdview AS SELECT * FROM table_not_exist1345") + + // A column that does not exist, for temporary view + intercept[AnalysisException] { + sql("CREATE OR REPLACE TEMPORARY VIEW myabcdview AS SELECT random1234 FROM jt") + } + } + + test("correctly parse CREATE VIEW statement") { + withView("testView") { + sql( + """CREATE VIEW IF NOT EXISTS + |default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla') + |TBLPROPERTIES ('a' = 'b') + |AS SELECT * FROM jt + |""".stripMargin) + checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i))) + } + } + + test("correctly parse a nested view") { + withTempDatabase { db => + withView("view1", "view2", s"$db.view3") { + sql("CREATE VIEW view1(x, y) AS SELECT * FROM jt") + + // Create a nested view in the same database. + sql("CREATE VIEW view2(id, id1) AS SELECT * FROM view1") + checkAnswer(sql("SELECT * FROM view2 ORDER BY id"), (1 to 9).map(i => Row(i, i))) + + // Create a nested view in a different database. + activateDatabase(db) { + sql(s"CREATE VIEW $db.view3(id, id1) AS SELECT * FROM default.view1") + checkAnswer(sql("SELECT * FROM view3 ORDER BY id"), (1 to 9).map(i => Row(i, i))) + } + } + } + } + + test("correctly parse CREATE TEMPORARY VIEW statement") { + withView("testView") { + sql( + """CREATE TEMPORARY VIEW + |testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla') + |TBLPROPERTIES ('a' = 'b') + |AS SELECT * FROM jt + |""".stripMargin) + checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i))) + } + } + + test("should NOT allow CREATE TEMPORARY VIEW when TEMPORARY VIEW with same name exists") { + withView("testView") { + sql("CREATE TEMPORARY VIEW testView AS SELECT id FROM jt") + + val e = intercept[AnalysisException] { + sql("CREATE TEMPORARY VIEW testView AS SELECT id FROM jt") + } + + assert(e.message.contains("Temporary table") && e.message.contains("already exists")) + } + } + + test("should allow CREATE TEMPORARY VIEW when a permanent VIEW with same name exists") { + withView("testView", "default.testView") { + sql("CREATE VIEW testView AS SELECT id FROM jt") + sql("CREATE TEMPORARY VIEW testView AS SELECT id FROM jt") + } + } + + test("should allow CREATE permanent VIEW when a TEMPORARY VIEW with same name exists") { + withView("testView", "default.testView") { + sql("CREATE TEMPORARY VIEW testView AS SELECT id FROM jt") + sql("CREATE VIEW testView AS SELECT id FROM jt") + } + } + + test("correctly handle CREATE VIEW IF NOT EXISTS") { + withTable("jt2") { + withView("testView") { + sql("CREATE VIEW testView AS SELECT id FROM jt") + + val df = (1 until 10).map(i => i -> i).toDF("i", "j") + df.write.format("json").saveAsTable("jt2") + sql("CREATE VIEW IF NOT EXISTS testView AS SELECT * FROM jt2") + + // make sure our view doesn't change. + checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i))) + } + } + } + + test(s"correctly handle CREATE OR REPLACE TEMPORARY VIEW") { + withTable("jt2") { + withView("testView") { + sql("CREATE OR REPLACE TEMPORARY VIEW testView AS SELECT id FROM jt") + checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i))) + + sql("CREATE OR REPLACE TEMPORARY VIEW testView AS SELECT id AS i, id AS j FROM jt") + // make sure the view has been changed. + checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i))) + } + } + } + + test("correctly handle CREATE OR REPLACE VIEW") { + withTable("jt2") { + sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt") + checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i))) + + val df = (1 until 10).map(i => i -> i).toDF("i", "j") + df.write.format("json").saveAsTable("jt2") + sql("CREATE OR REPLACE VIEW testView AS SELECT * FROM jt2") + // make sure the view has been changed. + checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i))) + + sql("DROP VIEW testView") + + val e = intercept[AnalysisException] { + sql("CREATE OR REPLACE VIEW IF NOT EXISTS testView AS SELECT id FROM jt") + } + assert(e.message.contains( + "CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed")) + } + } + + test("correctly handle ALTER VIEW") { + withTable("jt2") { + withView("testView") { + sql("CREATE VIEW testView AS SELECT id FROM jt") + + val df = (1 until 10).map(i => i -> i).toDF("i", "j") + df.write.format("json").saveAsTable("jt2") + sql("ALTER VIEW testView AS SELECT * FROM jt2") + // make sure the view has been changed. + checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i))) + } + } + } + + test("correctly handle ALTER VIEW on a referenced view") { + withView("view1", "view2") { + sql("CREATE VIEW view1(x, y) AS SELECT * FROM jt") + + // Create a nested view. + sql("CREATE VIEW view2(id, id1) AS SELECT * FROM view1") + checkAnswer(sql("SELECT * FROM view2 ORDER BY id"), (1 to 9).map(i => Row(i, i))) + + // Alter the referenced view. + sql("ALTER VIEW view1 AS SELECT id AS x, id1 + 1 As y FROM jt") + checkAnswer(sql("SELECT * FROM view2 ORDER BY id"), (1 to 9).map(i => Row(i, i + 1))) + } + } + + test("should not allow ALTER VIEW AS when the view does not exist") { + assertNoSuchTable("ALTER VIEW testView AS SELECT 1, 2") + assertNoSuchTable("ALTER VIEW default.testView AS SELECT 1, 2") + } + + test("ALTER VIEW AS should try to alter temp view first if view name has no database part") { + withView("test_view") { + withTempView("test_view") { + sql("CREATE VIEW test_view AS SELECT 1 AS a, 2 AS b") + sql("CREATE TEMP VIEW test_view AS SELECT 1 AS a, 2 AS b") + + sql("ALTER VIEW test_view AS SELECT 3 AS i, 4 AS j") + + // The temporary view should be updated. + checkAnswer(spark.table("test_view"), Row(3, 4)) + + // The permanent view should stay same. + checkAnswer(spark.table("default.test_view"), Row(1, 2)) + } + } + } + + test("ALTER VIEW AS should alter permanent view if view name has database part") { + withView("test_view") { + withTempView("test_view") { + sql("CREATE VIEW test_view AS SELECT 1 AS a, 2 AS b") + sql("CREATE TEMP VIEW test_view AS SELECT 1 AS a, 2 AS b") + + sql("ALTER VIEW default.test_view AS SELECT 3 AS i, 4 AS j") + + // The temporary view should stay same. + checkAnswer(spark.table("test_view"), Row(1, 2)) + + // The permanent view should be updated. + checkAnswer(spark.table("default.test_view"), Row(3, 4)) + } + } + } + + test("ALTER VIEW AS should keep the previous table properties, comment, create_time, etc.") { + withView("test_view") { + sql( + """ + |CREATE VIEW test_view + |COMMENT 'test' + |TBLPROPERTIES ('key' = 'a') + |AS SELECT 1 AS a, 2 AS b + """.stripMargin) + + val catalog = spark.sessionState.catalog + val viewMeta = catalog.getTableMetadata(TableIdentifier("test_view")) + assert(viewMeta.comment == Some("test")) + assert(viewMeta.properties("key") == "a") + + sql("ALTER VIEW test_view AS SELECT 3 AS i, 4 AS j") + val updatedViewMeta = catalog.getTableMetadata(TableIdentifier("test_view")) + assert(updatedViewMeta.comment == Some("test")) + assert(updatedViewMeta.properties("key") == "a") + assert(updatedViewMeta.createTime == viewMeta.createTime) + // The view should be updated. + checkAnswer(spark.table("test_view"), Row(3, 4)) + } + } + + test("create view for json table") { + // json table is not hive-compatible, make sure the new flag fix it. + withView("testView") { + sql("CREATE VIEW testView AS SELECT id FROM jt") + checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i))) + } + } + + test("create view for partitioned parquet table") { + // partitioned parquet table is not hive-compatible, make sure the new flag fix it. + withTable("parTable") { + withView("testView") { + val df = Seq(1 -> "a").toDF("i", "j") + df.write.format("parquet").partitionBy("i").saveAsTable("parTable") + sql("CREATE VIEW testView AS SELECT i, j FROM parTable") + checkAnswer(sql("SELECT * FROM testView"), Row(1, "a")) + } + } + } + + test("create view for joined tables") { + // make sure the new flag can handle some complex cases like join and schema change. + withTable("jt1", "jt2") { + spark.range(1, 10).toDF("id1").write.format("json").saveAsTable("jt1") + spark.range(1, 10).toDF("id2").write.format("json").saveAsTable("jt2") + withView("testView") { + sql("CREATE VIEW testView AS SELECT * FROM jt1 JOIN jt2 ON id1 == id2") + checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i))) + + val df = (1 until 10).map(i => i -> i).toDF("id1", "newCol") + df.write.format("json").mode(SaveMode.Overwrite).saveAsTable("jt1") + checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i))) + } + } + } + + test("CTE within view") { + withView("cte_view") { + sql("CREATE VIEW cte_view AS WITH w AS (SELECT 1 AS n) SELECT n FROM w") + checkAnswer(sql("SELECT * FROM cte_view"), Row(1)) + } + } + + test("Using view after switching current database") { + withView("v") { + sql("CREATE VIEW v AS SELECT * FROM jt") + withTempDatabase { db => + activateDatabase(db) { + // Should look up table `jt` in database `default`. + checkAnswer(sql("SELECT * FROM default.v"), sql("SELECT * FROM default.jt")) + + // The new `jt` table shouldn't be scanned. + sql("CREATE TABLE jt(key INT, value STRING) USING parquet") + checkAnswer(sql("SELECT * FROM default.v"), sql("SELECT * FROM default.jt")) + } + } + } + } + + test("Using view after adding more columns") { + withTable("add_col") { + spark.range(10).write.saveAsTable("add_col") + withView("v") { + sql("CREATE VIEW v AS SELECT * FROM add_col") + spark.range(10).select('id, 'id as 'a).write.mode("overwrite").saveAsTable("add_col") + checkAnswer(sql("SELECT * FROM v"), spark.range(10).toDF()) + } + } + } + + test("error handling: fail if the referenced table or view is invalid") { + withView("view1", "view2", "view3") { + // Fail if the referenced table is defined in a invalid database. + withTempDatabase { db => + withTable(s"$db.table1") { + activateDatabase(db) { + sql("CREATE TABLE table1(a int, b string) USING parquet") + sql("CREATE VIEW default.view1 AS SELECT * FROM table1") + } + } + } + assertInvalidReference("SELECT * FROM view1") + + // Fail if the referenced table is invalid. + withTable("table2") { + sql("CREATE TABLE table2(a int, b string) USING parquet") + sql("CREATE VIEW view2 AS SELECT * FROM table2") + } + assertInvalidReference("SELECT * FROM view2") + + // Fail if the referenced view is invalid. + withView("testView") { + sql("CREATE VIEW testView AS SELECT * FROM jt") + sql("CREATE VIEW view3 AS SELECT * FROM testView") + } + assertInvalidReference("SELECT * FROM view3") + } + } + + test("correctly resolve a view in a self join") { + withView("testView") { + sql("CREATE VIEW testView AS SELECT * FROM jt") + checkAnswer( + sql("SELECT * FROM testView t1 JOIN testView t2 ON t1.id = t2.id ORDER BY t1.id"), + (1 to 9).map(i => Row(i, i, i, i))) + } + } + + test("correctly handle a view with custom column names") { + withTable("tab1") { + spark.range(1, 10).selectExpr("id", "id + 1 id1").write.saveAsTable("tab1") + withView("testView", "testView2") { + sql("CREATE VIEW testView(x, y) AS SELECT * FROM tab1") + + // Correctly resolve a view with custom column names. + checkAnswer(sql("SELECT * FROM testView ORDER BY x"), (1 to 9).map(i => Row(i, i + 1))) + + // Throw an AnalysisException if the number of columns don't match up. + val e = intercept[AnalysisException] { + sql("CREATE VIEW testView2(x, y, z) AS SELECT * FROM tab1") + }.getMessage + assert(e.contains("The number of columns produced by the SELECT clause (num: `2`) does " + + "not match the number of column names specified by CREATE VIEW (num: `3`).")) + + // Correctly resolve a view when the referenced table schema changes. + spark.range(1, 10).selectExpr("id", "id + id dummy", "id + 1 id1") + .write.mode(SaveMode.Overwrite).saveAsTable("tab1") + checkAnswer(sql("SELECT * FROM testView ORDER BY x"), (1 to 9).map(i => Row(i, i + 1))) + + // Throw an AnalysisException if the column name is not found. + spark.range(1, 10).selectExpr("id", "id + 1 dummy") + .write.mode(SaveMode.Overwrite).saveAsTable("tab1") + intercept[AnalysisException](sql("SELECT * FROM testView")) + } + } + } + + test("resolve a view when the dataTypes of referenced table columns changed") { + withTable("tab1") { + spark.range(1, 10).selectExpr("id", "id + 1 id1").write.saveAsTable("tab1") + withView("testView") { + sql("CREATE VIEW testView AS SELECT * FROM tab1") + + // Allow casting from IntegerType to LongType + val df = (1 until 10).map(i => (i, i + 1)).toDF("id", "id1") + df.write.format("json").mode(SaveMode.Overwrite).saveAsTable("tab1") + checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i + 1))) + + // Casting from DoubleType to LongType might truncate, throw an AnalysisException. + val df2 = (1 until 10).map(i => (i.toDouble, i.toDouble)).toDF("id", "id1") + df2.write.format("json").mode(SaveMode.Overwrite).saveAsTable("tab1") + intercept[AnalysisException](sql("SELECT * FROM testView")) + + // Can't cast from ArrayType to LongType, throw an AnalysisException. + val df3 = (1 until 10).map(i => (i, Seq(i))).toDF("id", "id1") + df3.write.format("json").mode(SaveMode.Overwrite).saveAsTable("tab1") + intercept[AnalysisException](sql("SELECT * FROM testView")) + } + } + } + + // TODO: Check for cyclic view references on ALTER VIEW. + ignore("correctly handle a cyclic view reference") { + withView("view1", "view2") { + sql("CREATE VIEW view1 AS SELECT * FROM jt") + sql("CREATE VIEW view2 AS SELECT * FROM view1") + intercept[AnalysisException](sql("ALTER VIEW view1 AS SELECT * FROM view2")) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/3755da76/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala index 36dc236..e122b39 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSQLContext.scala @@ -70,13 +70,10 @@ trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach { * Stop the underlying [[org.apache.spark.SparkContext]], if any. */ protected override def afterAll(): Unit = { - try { - if (_spark != null) { - _spark.stop() - _spark = null - } - } finally { - super.afterAll() + super.afterAll() + if (_spark != null) { + _spark.stop() + _spark = null } } http://git-wip-us.apache.org/repos/asf/spark/blob/3755da76/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSQLViewSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSQLViewSuite.scala new file mode 100644 index 0000000..5afb37b --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSQLViewSuite.scala @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} +import org.apache.spark.sql.execution.SQLViewSuite +import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton} +import org.apache.spark.sql.types.StructType + +/** + * A test suite for Hive view related functionality. + */ +class HiveSQLViewSuite extends SQLViewSuite with TestHiveSingleton { + protected override val spark: SparkSession = TestHive.sparkSession + + import testImplicits._ + + test("create a permanent/temp view using a hive, built-in, and permanent user function") { + val permanentFuncName = "myUpper" + val permanentFuncClass = + classOf[org.apache.hadoop.hive.ql.udf.generic.GenericUDFUpper].getCanonicalName + val builtInFuncNameInLowerCase = "abs" + val builtInFuncNameInMixedCase = "aBs" + val hiveFuncName = "histogram_numeric" + + withUserDefinedFunction(permanentFuncName -> false) { + sql(s"CREATE FUNCTION $permanentFuncName AS '$permanentFuncClass'") + withTable("tab1") { + (1 to 10).map(i => (s"$i", i)).toDF("str", "id").write.saveAsTable("tab1") + Seq("VIEW", "TEMPORARY VIEW").foreach { viewMode => + withView("view1") { + sql( + s""" + |CREATE $viewMode view1 + |AS SELECT + |$permanentFuncName(str), + |$builtInFuncNameInLowerCase(id), + |$builtInFuncNameInMixedCase(id) as aBs, + |$hiveFuncName(id, 5) over() + |FROM tab1 + """.stripMargin) + checkAnswer(sql("select count(*) FROM view1"), Row(10)) + } + } + } + } + } + + test("create a permanent/temp view using a temporary function") { + val tempFunctionName = "temp" + val functionClass = + classOf[org.apache.hadoop.hive.ql.udf.generic.GenericUDFUpper].getCanonicalName + withUserDefinedFunction(tempFunctionName -> true) { + sql(s"CREATE TEMPORARY FUNCTION $tempFunctionName AS '$functionClass'") + withView("view1", "tempView1") { + withTable("tab1") { + (1 to 10).map(i => s"$i").toDF("id").write.saveAsTable("tab1") + + // temporary view + sql(s"CREATE TEMPORARY VIEW tempView1 AS SELECT $tempFunctionName(id) from tab1") + checkAnswer(sql("select count(*) FROM tempView1"), Row(10)) + + // permanent view + val e = intercept[AnalysisException] { + sql(s"CREATE VIEW view1 AS SELECT $tempFunctionName(id) from tab1") + }.getMessage + assert(e.contains("Not allowed to create a permanent view `view1` by referencing " + + s"a temporary function `$tempFunctionName`")) + } + } + } + } + + test("SPARK-14933 - create view from hive parquet table") { + withTable("t_part") { + withView("v_part") { + spark.sql("create table t_part stored as parquet as select 1 as a, 2 as b") + spark.sql("create view v_part as select * from t_part") + checkAnswer( + sql("select * from t_part"), + sql("select * from v_part")) + } + } + } + + test("SPARK-14933 - create view from hive orc table") { + withTable("t_orc") { + withView("v_orc") { + spark.sql("create table t_orc stored as orc as select 1 as a, 2 as b") + spark.sql("create view v_orc as select * from t_orc") + checkAnswer( + sql("select * from t_orc"), + sql("select * from v_orc")) + } + } + } + + test("make sure we can resolve view created by old version of Spark") { + withTable("hive_table") { + withView("old_view") { + spark.sql("CREATE TABLE hive_table AS SELECT 1 AS a, 2 AS b") + // The views defined by older versions of Spark(before 2.2) will have empty view default + // database name, and all the relations referenced in the viewText will have database part + // defined. + val view = CatalogTable( + identifier = TableIdentifier("old_view"), + tableType = CatalogTableType.VIEW, + storage = CatalogStorageFormat.empty, + schema = new StructType().add("a", "int").add("b", "int"), + viewText = Some("SELECT `gen_attr_0` AS `a`, `gen_attr_1` AS `b` FROM (SELECT " + + "`gen_attr_0`, `gen_attr_1` FROM (SELECT `a` AS `gen_attr_0`, `b` AS " + + "`gen_attr_1` FROM hive_table) AS gen_subquery_0) AS hive_table") + ) + hiveContext.sessionState.catalog.createTable(view, ignoreIfExists = false) + val df = sql("SELECT * FROM old_view") + // Check the output rows. + checkAnswer(df, Row(1, 2)) + // Check the output schema. + assert(df.schema.sameType(view.schema)) + } + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/3755da76/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala deleted file mode 100644 index 958001e..0000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLViewSuite.scala +++ /dev/null @@ -1,762 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.execution - -import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.NoSuchTableException -import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} -import org.apache.spark.sql.hive.test.TestHiveSingleton -import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.sql.types.StructType - -/** - * A suite for testing view related functionality. - */ -class SQLViewSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { - import spark.implicits._ - - override def beforeAll(): Unit = { - // Create a simple table with two columns: id and id1 - spark.range(1, 10).selectExpr("id", "id id1").write.format("json").saveAsTable("jt") - } - - override def afterAll(): Unit = { - spark.sql(s"DROP TABLE IF EXISTS jt") - } - - test("create a permanent view on a permanent view") { - withView("jtv1", "jtv2") { - sql("CREATE VIEW jtv1 AS SELECT * FROM jt WHERE id > 3") - sql("CREATE VIEW jtv2 AS SELECT * FROM jtv1 WHERE id < 6") - checkAnswer(sql("select count(*) FROM jtv2"), Row(2)) - } - } - - test("create a temp view on a permanent view") { - withView("jtv1", "temp_jtv1") { - sql("CREATE VIEW jtv1 AS SELECT * FROM jt WHERE id > 3") - sql("CREATE TEMPORARY VIEW temp_jtv1 AS SELECT * FROM jtv1 WHERE id < 6") - checkAnswer(sql("select count(*) FROM temp_jtv1"), Row(2)) - } - } - - test("create a temp view on a temp view") { - withView("temp_jtv1", "temp_jtv2") { - sql("CREATE TEMPORARY VIEW temp_jtv1 AS SELECT * FROM jt WHERE id > 3") - sql("CREATE TEMPORARY VIEW temp_jtv2 AS SELECT * FROM temp_jtv1 WHERE id < 6") - checkAnswer(sql("select count(*) FROM temp_jtv2"), Row(2)) - } - } - - test("create a permanent view on a temp view") { - withView("jtv1", "temp_jtv1", "global_temp_jtv1") { - sql("CREATE TEMPORARY VIEW temp_jtv1 AS SELECT * FROM jt WHERE id > 3") - var e = intercept[AnalysisException] { - sql("CREATE VIEW jtv1 AS SELECT * FROM temp_jtv1 WHERE id < 6") - }.getMessage - assert(e.contains("Not allowed to create a permanent view `jtv1` by " + - "referencing a temporary view `temp_jtv1`")) - - val globalTempDB = spark.sharedState.globalTempViewManager.database - sql("CREATE GLOBAL TEMP VIEW global_temp_jtv1 AS SELECT * FROM jt WHERE id > 0") - e = intercept[AnalysisException] { - sql(s"CREATE VIEW jtv1 AS SELECT * FROM $globalTempDB.global_temp_jtv1 WHERE id < 6") - }.getMessage - assert(e.contains(s"Not allowed to create a permanent view `jtv1` by referencing " + - s"a temporary view `global_temp`.`global_temp_jtv1`")) - } - } - - test("error handling: existing a table with the duplicate name when creating/altering a view") { - withTable("tab1") { - sql("CREATE TABLE tab1 (id int)") - var e = intercept[AnalysisException] { - sql("CREATE OR REPLACE VIEW tab1 AS SELECT * FROM jt") - }.getMessage - assert(e.contains("`tab1` is not a view")) - e = intercept[AnalysisException] { - sql("CREATE VIEW tab1 AS SELECT * FROM jt") - }.getMessage - assert(e.contains("`tab1` is not a view")) - e = intercept[AnalysisException] { - sql("ALTER VIEW tab1 AS SELECT * FROM jt") - }.getMessage - assert(e.contains("`tab1` is not a view")) - } - } - - test("existing a table with the duplicate name when CREATE VIEW IF NOT EXISTS") { - withTable("tab1") { - sql("CREATE TABLE tab1 (id int)") - sql("CREATE VIEW IF NOT EXISTS tab1 AS SELECT * FROM jt") - checkAnswer(sql("select count(*) FROM tab1"), Row(0)) - } - } - - test("Issue exceptions for ALTER VIEW on the temporary view") { - val viewName = "testView" - withTempView(viewName) { - spark.range(10).createTempView(viewName) - assertNoSuchTable(s"ALTER VIEW $viewName SET TBLPROPERTIES ('p' = 'an')") - assertNoSuchTable(s"ALTER VIEW $viewName UNSET TBLPROPERTIES ('p')") - } - } - - test("Issue exceptions for ALTER TABLE on the temporary view") { - val viewName = "testView" - withTempView(viewName) { - spark.range(10).createTempView(viewName) - assertNoSuchTable(s"ALTER TABLE $viewName SET SERDE 'whatever'") - assertNoSuchTable(s"ALTER TABLE $viewName PARTITION (a=1, b=2) SET SERDE 'whatever'") - assertNoSuchTable(s"ALTER TABLE $viewName SET SERDEPROPERTIES ('p' = 'an')") - assertNoSuchTable(s"ALTER TABLE $viewName SET LOCATION '/path/to/your/lovely/heart'") - assertNoSuchTable(s"ALTER TABLE $viewName PARTITION (a='4') SET LOCATION '/path/to/home'") - assertNoSuchTable(s"ALTER TABLE $viewName ADD IF NOT EXISTS PARTITION (a='4', b='8')") - assertNoSuchTable(s"ALTER TABLE $viewName DROP PARTITION (a='4', b='8')") - assertNoSuchTable(s"ALTER TABLE $viewName PARTITION (a='4') RENAME TO PARTITION (a='5')") - assertNoSuchTable(s"ALTER TABLE $viewName RECOVER PARTITIONS") - } - } - - test("Issue exceptions for other table DDL on the temporary view") { - val viewName = "testView" - withTempView(viewName) { - spark.range(10).createTempView(viewName) - - val e = intercept[AnalysisException] { - sql(s"INSERT INTO TABLE $viewName SELECT 1") - }.getMessage - assert(e.contains("Inserting into an RDD-based table is not allowed")) - - val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath - assertNoSuchTable(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE $viewName""") - assertNoSuchTable(s"TRUNCATE TABLE $viewName") - assertNoSuchTable(s"SHOW CREATE TABLE $viewName") - assertNoSuchTable(s"SHOW PARTITIONS $viewName") - assertNoSuchTable(s"ANALYZE TABLE $viewName COMPUTE STATISTICS") - assertNoSuchTable(s"ANALYZE TABLE $viewName COMPUTE STATISTICS FOR COLUMNS id") - } - } - - private def assertNoSuchTable(query: String): Unit = { - intercept[NoSuchTableException] { - sql(query) - } - } - - test("error handling: insert/load/truncate table commands against a view") { - val viewName = "testView" - withView(viewName) { - sql(s"CREATE VIEW $viewName AS SELECT id FROM jt") - var e = intercept[AnalysisException] { - sql(s"INSERT INTO TABLE $viewName SELECT 1") - }.getMessage - assert(e.contains("Inserting into an RDD-based table is not allowed")) - - val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath - e = intercept[AnalysisException] { - sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE $viewName""") - }.getMessage - assert(e.contains(s"Target table in LOAD DATA cannot be a view: `default`.`testview`")) - - e = intercept[AnalysisException] { - sql(s"TRUNCATE TABLE $viewName") - }.getMessage - assert(e.contains(s"Operation not allowed: TRUNCATE TABLE on views: `default`.`testview`")) - } - } - - test("error handling: fail if the view sql itself is invalid") { - // A table that does not exist - intercept[AnalysisException] { - sql("CREATE OR REPLACE VIEW myabcdview AS SELECT * FROM table_not_exist1345").collect() - } - - // A column that does not exist - intercept[AnalysisException] { - sql("CREATE OR REPLACE VIEW myabcdview AS SELECT random1234 FROM jt").collect() - } - } - - test("error handling: fail if the temp view name contains the database prefix") { - // Fully qualified table name like "database.table" is not allowed for temporary view - val e = intercept[AnalysisException] { - sql("CREATE OR REPLACE TEMPORARY VIEW default.myabcdview AS SELECT * FROM jt") - } - assert(e.message.contains("It is not allowed to add database prefix")) - } - - test("error handling: disallow IF NOT EXISTS for CREATE TEMPORARY VIEW") { - val e = intercept[AnalysisException] { - sql("CREATE TEMPORARY VIEW IF NOT EXISTS myabcdview AS SELECT * FROM jt") - } - assert(e.message.contains("It is not allowed to define a TEMPORARY view with IF NOT EXISTS")) - } - - test("error handling: fail if the temp view sql itself is invalid") { - // A table that does not exist for temporary view - intercept[AnalysisException] { - sql("CREATE OR REPLACE TEMPORARY VIEW myabcdview AS SELECT * FROM table_not_exist1345") - } - - // A column that does not exist, for temporary view - intercept[AnalysisException] { - sql("CREATE OR REPLACE TEMPORARY VIEW myabcdview AS SELECT random1234 FROM jt") - } - } - - test("correctly parse CREATE VIEW statement") { - withView("testView") { - sql( - """CREATE VIEW IF NOT EXISTS - |default.testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla') - |TBLPROPERTIES ('a' = 'b') - |AS SELECT * FROM jt - |""".stripMargin) - checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i))) - } - } - - test("correctly parse CREATE TEMPORARY VIEW statement") { - withView("testView") { - sql( - """CREATE TEMPORARY VIEW - |testView (c1 COMMENT 'blabla', c2 COMMENT 'blabla') - |TBLPROPERTIES ('a' = 'b') - |AS SELECT * FROM jt - |""".stripMargin) - checkAnswer(sql("SELECT c1, c2 FROM testView ORDER BY c1"), (1 to 9).map(i => Row(i, i))) - } - } - - test("should NOT allow CREATE TEMPORARY VIEW when TEMPORARY VIEW with same name exists") { - withView("testView") { - sql("CREATE TEMPORARY VIEW testView AS SELECT id FROM jt") - - val e = intercept[AnalysisException] { - sql("CREATE TEMPORARY VIEW testView AS SELECT id FROM jt") - } - - assert(e.message.contains("Temporary table") && e.message.contains("already exists")) - } - } - - test("should allow CREATE TEMPORARY VIEW when a permanent VIEW with same name exists") { - withView("testView", "default.testView") { - sql("CREATE VIEW testView AS SELECT id FROM jt") - sql("CREATE TEMPORARY VIEW testView AS SELECT id FROM jt") - } - } - - test("should allow CREATE permanent VIEW when a TEMPORARY VIEW with same name exists") { - withView("testView", "default.testView") { - sql("CREATE TEMPORARY VIEW testView AS SELECT id FROM jt") - sql("CREATE VIEW testView AS SELECT id FROM jt") - } - } - - test("correctly handle CREATE VIEW IF NOT EXISTS") { - withTable("jt2") { - withView("testView") { - sql("CREATE VIEW testView AS SELECT id FROM jt") - - val df = (1 until 10).map(i => i -> i).toDF("i", "j") - df.write.format("json").saveAsTable("jt2") - sql("CREATE VIEW IF NOT EXISTS testView AS SELECT * FROM jt2") - - // make sure our view doesn't change. - checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i))) - } - } - } - - test(s"correctly handle CREATE OR REPLACE TEMPORARY VIEW") { - withTable("jt2") { - withView("testView") { - sql("CREATE OR REPLACE TEMPORARY VIEW testView AS SELECT id FROM jt") - checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i))) - - sql("CREATE OR REPLACE TEMPORARY VIEW testView AS SELECT id AS i, id AS j FROM jt") - // make sure the view has been changed. - checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i))) - } - } - } - - test("correctly handle CREATE OR REPLACE VIEW") { - withTable("jt2") { - sql("CREATE OR REPLACE VIEW testView AS SELECT id FROM jt") - checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i))) - - val df = (1 until 10).map(i => i -> i).toDF("i", "j") - df.write.format("json").saveAsTable("jt2") - sql("CREATE OR REPLACE VIEW testView AS SELECT * FROM jt2") - // make sure the view has been changed. - checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i))) - - sql("DROP VIEW testView") - - val e = intercept[AnalysisException] { - sql("CREATE OR REPLACE VIEW IF NOT EXISTS testView AS SELECT id FROM jt") - } - assert(e.message.contains( - "CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed")) - } - } - - test("correctly handle ALTER VIEW") { - withTable("jt2") { - withView("testView") { - sql("CREATE VIEW testView AS SELECT id FROM jt") - - val df = (1 until 10).map(i => i -> i).toDF("i", "j") - df.write.format("json").saveAsTable("jt2") - sql("ALTER VIEW testView AS SELECT * FROM jt2") - // make sure the view has been changed. - checkAnswer(sql("SELECT * FROM testView ORDER BY i"), (1 to 9).map(i => Row(i, i))) - } - } - } - - test("should not allow ALTER VIEW AS when the view does not exist") { - assertNoSuchTable("ALTER VIEW testView AS SELECT 1, 2") - assertNoSuchTable("ALTER VIEW default.testView AS SELECT 1, 2") - } - - test("ALTER VIEW AS should try to alter temp view first if view name has no database part") { - withView("test_view") { - withTempView("test_view") { - sql("CREATE VIEW test_view AS SELECT 1 AS a, 2 AS b") - sql("CREATE TEMP VIEW test_view AS SELECT 1 AS a, 2 AS b") - - sql("ALTER VIEW test_view AS SELECT 3 AS i, 4 AS j") - - // The temporary view should be updated. - checkAnswer(spark.table("test_view"), Row(3, 4)) - - // The permanent view should stay same. - checkAnswer(spark.table("default.test_view"), Row(1, 2)) - } - } - } - - test("ALTER VIEW AS should alter permanent view if view name has database part") { - withView("test_view") { - withTempView("test_view") { - sql("CREATE VIEW test_view AS SELECT 1 AS a, 2 AS b") - sql("CREATE TEMP VIEW test_view AS SELECT 1 AS a, 2 AS b") - - sql("ALTER VIEW default.test_view AS SELECT 3 AS i, 4 AS j") - - // The temporary view should stay same. - checkAnswer(spark.table("test_view"), Row(1, 2)) - - // The permanent view should be updated. - checkAnswer(spark.table("default.test_view"), Row(3, 4)) - } - } - } - - test("ALTER VIEW AS should keep the previous table properties, comment, create_time, etc.") { - withView("test_view") { - sql( - """ - |CREATE VIEW test_view - |COMMENT 'test' - |TBLPROPERTIES ('key' = 'a') - |AS SELECT 1 AS a, 2 AS b - """.stripMargin) - - val catalog = spark.sessionState.catalog - val viewMeta = catalog.getTableMetadata(TableIdentifier("test_view")) - assert(viewMeta.comment == Some("test")) - assert(viewMeta.properties("key") == "a") - - sql("ALTER VIEW test_view AS SELECT 3 AS i, 4 AS j") - val updatedViewMeta = catalog.getTableMetadata(TableIdentifier("test_view")) - assert(updatedViewMeta.comment == Some("test")) - assert(updatedViewMeta.properties("key") == "a") - assert(updatedViewMeta.createTime == viewMeta.createTime) - // The view should be updated. - checkAnswer(spark.table("test_view"), Row(3, 4)) - } - } - - test("create hive view for json table") { - // json table is not hive-compatible, make sure the new flag fix it. - withView("testView") { - sql("CREATE VIEW testView AS SELECT id FROM jt") - checkAnswer(sql("SELECT * FROM testView ORDER BY id"), (1 to 9).map(i => Row(i))) - } - } - - test("create hive view for partitioned parquet table") { - // partitioned parquet table is not hive-compatible, make sure the new flag fix it. - withTable("parTable") { - withView("testView") { - val df = Seq(1 -> "a").toDF("i", "j") - df.write.format("parquet").partitionBy("i").saveAsTable("parTable") - sql("CREATE VIEW testView AS SELECT i, j FROM parTable") - checkAnswer(sql("SELECT * FROM testView"), Row(1, "a")) - } - } - } - - test("CTE within view") { - withView("cte_view") { - sql("CREATE VIEW cte_view AS WITH w AS (SELECT 1 AS n) SELECT n FROM w") - checkAnswer(sql("SELECT * FROM cte_view"), Row(1)) - } - } - - test("Using view after switching current database") { - withView("v") { - sql("CREATE VIEW v AS SELECT * FROM src") - withTempDatabase { db => - activateDatabase(db) { - // Should look up table `src` in database `default`. - checkAnswer(sql("SELECT * FROM default.v"), sql("SELECT * FROM default.src")) - - // The new `src` table shouldn't be scanned. - sql("CREATE TABLE src(key INT, value STRING)") - checkAnswer(sql("SELECT * FROM default.v"), sql("SELECT * FROM default.src")) - } - } - } - } - - test("Using view after adding more columns") { - withTable("add_col") { - spark.range(10).write.saveAsTable("add_col") - withView("v") { - sql("CREATE VIEW v AS SELECT * FROM add_col") - spark.range(10).select('id, 'id as 'a).write.mode("overwrite").saveAsTable("add_col") - checkAnswer(sql("SELECT * FROM v"), spark.range(10).toDF()) - } - } - } - - test("create hive view for joined tables") { - // make sure the new flag can handle some complex cases like join and schema change. - withTable("jt1", "jt2") { - spark.range(1, 10).toDF("id1").write.format("json").saveAsTable("jt1") - spark.range(1, 10).toDF("id2").write.format("json").saveAsTable("jt2") - sql("CREATE VIEW testView AS SELECT * FROM jt1 JOIN jt2 ON id1 == id2") - checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i))) - - val df = (1 until 10).map(i => i -> i).toDF("id1", "newCol") - df.write.format("json").mode(SaveMode.Overwrite).saveAsTable("jt1") - checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i))) - - sql("DROP VIEW testView") - } - } - - test("SPARK-14933 - create view from hive parquet table") { - withTable("t_part") { - withView("v_part") { - spark.sql("create table t_part stored as parquet as select 1 as a, 2 as b") - spark.sql("create view v_part as select * from t_part") - checkAnswer( - sql("select * from t_part"), - sql("select * from v_part")) - } - } - } - - test("SPARK-14933 - create view from hive orc table") { - withTable("t_orc") { - withView("v_orc") { - spark.sql("create table t_orc stored as orc as select 1 as a, 2 as b") - spark.sql("create view v_orc as select * from t_orc") - checkAnswer( - sql("select * from t_orc"), - sql("select * from v_orc")) - } - } - } - - test("create a permanent/temp view using a hive, built-in, and permanent user function") { - val permanentFuncName = "myUpper" - val permanentFuncClass = - classOf[org.apache.hadoop.hive.ql.udf.generic.GenericUDFUpper].getCanonicalName - val builtInFuncNameInLowerCase = "abs" - val builtInFuncNameInMixedCase = "aBs" - val hiveFuncName = "histogram_numeric" - - withUserDefinedFunction(permanentFuncName -> false) { - sql(s"CREATE FUNCTION $permanentFuncName AS '$permanentFuncClass'") - withTable("tab1") { - (1 to 10).map(i => (s"$i", i)).toDF("str", "id").write.saveAsTable("tab1") - Seq("VIEW", "TEMPORARY VIEW").foreach { viewMode => - withView("view1") { - sql( - s""" - |CREATE $viewMode view1 - |AS SELECT - |$permanentFuncName(str), - |$builtInFuncNameInLowerCase(id), - |$builtInFuncNameInMixedCase(id) as aBs, - |$hiveFuncName(id, 5) over() - |FROM tab1 - """.stripMargin) - checkAnswer(sql("select count(*) FROM view1"), Row(10)) - } - } - } - } - } - - test("create a permanent/temp view using a temporary function") { - val tempFunctionName = "temp" - val functionClass = - classOf[org.apache.hadoop.hive.ql.udf.generic.GenericUDFUpper].getCanonicalName - withUserDefinedFunction(tempFunctionName -> true) { - sql(s"CREATE TEMPORARY FUNCTION $tempFunctionName AS '$functionClass'") - withView("view1", "tempView1") { - withTable("tab1") { - (1 to 10).map(i => s"$i").toDF("id").write.saveAsTable("tab1") - - // temporary view - sql(s"CREATE TEMPORARY VIEW tempView1 AS SELECT $tempFunctionName(id) from tab1") - checkAnswer(sql("select count(*) FROM tempView1"), Row(10)) - - // permanent view - val e = intercept[AnalysisException] { - sql(s"CREATE VIEW view1 AS SELECT $tempFunctionName(id) from tab1") - }.getMessage - assert(e.contains("Not allowed to create a permanent view `view1` by referencing " + - s"a temporary function `$tempFunctionName`")) - } - } - } - } - - test("correctly resolve a nested view") { - withTempDatabase { db => - withView(s"$db.view1", s"$db.view2") { - val view1 = CatalogTable( - identifier = TableIdentifier("view1", Some(db)), - tableType = CatalogTableType.VIEW, - storage = CatalogStorageFormat.empty, - schema = new StructType().add("x", "long").add("y", "long"), - viewText = Some("SELECT * FROM jt"), - properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default", - CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2", - s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "id", - s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "id1")) - val view2 = CatalogTable( - identifier = TableIdentifier("view2", Some(db)), - tableType = CatalogTableType.VIEW, - storage = CatalogStorageFormat.empty, - schema = new StructType().add("id", "long").add("id1", "long"), - viewText = Some("SELECT * FROM view1"), - properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> db, - CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2", - s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "x", - s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "y")) - activateDatabase(db) { - hiveContext.sessionState.catalog.createTable(view1, ignoreIfExists = false) - hiveContext.sessionState.catalog.createTable(view2, ignoreIfExists = false) - checkAnswer(sql("SELECT * FROM view2 ORDER BY id"), (1 to 9).map(i => Row(i, i))) - } - } - } - } - - test("correctly resolve a view with CTE") { - withView("cte_view") { - val cte_view = CatalogTable( - identifier = TableIdentifier("cte_view"), - tableType = CatalogTableType.VIEW, - storage = CatalogStorageFormat.empty, - schema = new StructType().add("n", "int"), - viewText = Some("WITH w AS (SELECT 1 AS n) SELECT n FROM w"), - properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default", - CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "1", - s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "n")) - hiveContext.sessionState.catalog.createTable(cte_view, ignoreIfExists = false) - checkAnswer(sql("SELECT * FROM cte_view"), Row(1)) - } - } - - test("correctly resolve a view in a self join") { - withView("join_view") { - val join_view = CatalogTable( - identifier = TableIdentifier("join_view"), - tableType = CatalogTableType.VIEW, - storage = CatalogStorageFormat.empty, - schema = new StructType().add("id", "long").add("id1", "long"), - viewText = Some("SELECT * FROM jt"), - properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default", - CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2", - s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "id", - s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "id1")) - hiveContext.sessionState.catalog.createTable(join_view, ignoreIfExists = false) - checkAnswer( - sql("SELECT * FROM join_view t1 JOIN join_view t2 ON t1.id = t2.id ORDER BY t1.id"), - (1 to 9).map(i => Row(i, i, i, i))) - } - } - - private def assertInvalidReference(query: String): Unit = { - val e = intercept[AnalysisException] { - sql(query) - }.getMessage - assert(e.contains("Table or view not found")) - } - - test("error handling: fail if the referenced table or view is invalid") { - withView("view1", "view2", "view3") { - // Fail if the referenced table is defined in a invalid database. - val view1 = CatalogTable( - identifier = TableIdentifier("view1"), - tableType = CatalogTableType.VIEW, - storage = CatalogStorageFormat.empty, - schema = new StructType().add("id", "long").add("id1", "long"), - viewText = Some("SELECT * FROM invalid_db.jt"), - properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default", - CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2", - s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "id", - s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "id1")) - hiveContext.sessionState.catalog.createTable(view1, ignoreIfExists = false) - assertInvalidReference("SELECT * FROM view1") - - // Fail if the referenced table is invalid. - val view2 = CatalogTable( - identifier = TableIdentifier("view2"), - tableType = CatalogTableType.VIEW, - storage = CatalogStorageFormat.empty, - schema = new StructType().add("id", "long").add("id1", "long"), - viewText = Some("SELECT * FROM invalid_table"), - properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default", - CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2", - s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "id", - s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "id1")) - hiveContext.sessionState.catalog.createTable(view2, ignoreIfExists = false) - assertInvalidReference("SELECT * FROM view2") - - // Fail if the referenced view is invalid. - val view3 = CatalogTable( - identifier = TableIdentifier("view3"), - tableType = CatalogTableType.VIEW, - storage = CatalogStorageFormat.empty, - schema = new StructType().add("id", "long").add("id1", "long"), - viewText = Some("SELECT * FROM view2"), - properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default", - CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2", - s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "id", - s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "id1")) - hiveContext.sessionState.catalog.createTable(view3, ignoreIfExists = false) - assertInvalidReference("SELECT * FROM view3") - } - } - - test("make sure we can resolve view created by old version of Spark") { - withTable("hive_table") { - withView("old_view") { - spark.sql("CREATE TABLE hive_table AS SELECT 1 AS a, 2 AS b") - // The views defined by older versions of Spark(before 2.2) will have empty view default - // database name, and all the relations referenced in the viewText will have database part - // defined. - val view = CatalogTable( - identifier = TableIdentifier("old_view"), - tableType = CatalogTableType.VIEW, - storage = CatalogStorageFormat.empty, - schema = new StructType().add("a", "int").add("b", "int"), - viewText = Some("SELECT `gen_attr_0` AS `a`, `gen_attr_1` AS `b` FROM (SELECT " + - "`gen_attr_0`, `gen_attr_1` FROM (SELECT `a` AS `gen_attr_0`, `b` AS " + - "`gen_attr_1` FROM hive_table) AS gen_subquery_0) AS hive_table") - ) - hiveContext.sessionState.catalog.createTable(view, ignoreIfExists = false) - val df = sql("SELECT * FROM old_view") - // Check the output rows. - checkAnswer(df, Row(1, 2)) - // Check the output schema. - assert(df.schema.sameType(view.schema)) - } - } - } - - test("resolve a view with custom column names") { - withTable("testTable") { - spark.range(1, 10).selectExpr("id", "id + 1 id1").write.saveAsTable("testTable") - withView("testView") { - val testView = CatalogTable( - identifier = TableIdentifier("testView"), - tableType = CatalogTableType.VIEW, - storage = CatalogStorageFormat.empty, - schema = new StructType().add("x", "long").add("y", "long"), - viewText = Some("SELECT * FROM testTable"), - properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default", - CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2", - s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "id", - s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "id1")) - hiveContext.sessionState.catalog.createTable(testView, ignoreIfExists = false) - - // Correctly resolve a view with custom column names. - checkAnswer(sql("SELECT * FROM testView ORDER BY x"), (1 to 9).map(i => Row(i, i + 1))) - - // Correctly resolve a view when the referenced table schema changes. - spark.range(1, 10).selectExpr("id", "id + id dummy", "id + 1 id1") - .write.mode(SaveMode.Overwrite).saveAsTable("testTable") - checkAnswer(sql("SELECT * FROM testView ORDER BY x"), (1 to 9).map(i => Row(i, i + 1))) - - // Throw an AnalysisException if the column name is not found. - spark.range(1, 10).selectExpr("id", "id + 1 dummy") - .write.mode(SaveMode.Overwrite).saveAsTable("testTable") - intercept[AnalysisException](sql("SELECT * FROM testView")) - } - } - } - - test("resolve a view when the dataTypes of referenced table columns changed") { - withTable("testTable") { - spark.range(1, 10).selectExpr("id", "id + 1 id1").write.saveAsTable("testTable") - withView("testView") { - val testView = CatalogTable( - identifier = TableIdentifier("testView"), - tableType = CatalogTableType.VIEW, - storage = CatalogStorageFormat.empty, - schema = new StructType().add("id", "long").add("id1", "long"), - viewText = Some("SELECT * FROM testTable"), - properties = Map(CatalogTable.VIEW_DEFAULT_DATABASE -> "default", - CatalogTable.VIEW_QUERY_OUTPUT_NUM_COLUMNS -> "2", - s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}0" -> "id", - s"${CatalogTable.VIEW_QUERY_OUTPUT_COLUMN_NAME_PREFIX}1" -> "id1")) - hiveContext.sessionState.catalog.createTable(testView, ignoreIfExists = false) - - // Allow casting from IntegerType to LongType - val df = (1 until 10).map(i => (i, i + 1)).toDF("id", "id1") - df.write.format("json").mode(SaveMode.Overwrite).saveAsTable("testTable") - checkAnswer(sql("SELECT * FROM testView ORDER BY id1"), (1 to 9).map(i => Row(i, i + 1))) - - // Casting from DoubleType to LongType might truncate, throw an AnalysisException. - val df2 = (1 until 10).map(i => (i.toDouble, i.toDouble)).toDF("id", "id1") - df2.write.format("json").mode(SaveMode.Overwrite).saveAsTable("testTable") - intercept[AnalysisException](sql("SELECT * FROM testView")) - - // Can't cast from ArrayType to LongType, throw an AnalysisException. - val df3 = (1 until 10).map(i => (i, Seq(i))).toDF("id", "id1") - df3.write.format("json").mode(SaveMode.Overwrite).saveAsTable("testTable") - intercept[AnalysisException](sql("SELECT * FROM testView")) - } - } - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org