Repository: spark Updated Branches: refs/heads/branch-2.0 5fd354b2d -> cf728b0f2
[SPARK-17541][SQL] fix some DDL bugs about table management when same-name temp view exists In `SessionCatalog`, we have several operations(`tableExists`, `dropTable`, `loopupRelation`, etc) that handle both temp views and metastore tables/views. This brings some bugs to DDL commands that want to handle temp view only or metastore table/view only. These bugs are: 1. `CREATE TABLE USING` will fail if a same-name temp view exists 2. `Catalog.dropTempView`will un-cache and drop metastore table if a same-name table exists 3. `saveAsTable` will fail or have unexpected behaviour if a same-name temp view exists. These bug fixes are pulled out from https://github.com/apache/spark/pull/14962 and targets both master and 2.0 branch new regression tests Author: Wenchen Fan <wenc...@databricks.com> Closes #15099 from cloud-fan/fix-view. (cherry picked from commit 3fe630d314cf50d69868b7707ac8d8d2027080b8) Signed-off-by: Wenchen Fan <wenc...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cf728b0f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cf728b0f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cf728b0f Branch: refs/heads/branch-2.0 Commit: cf728b0f2dc7c1e9f62a8984122d3bf91e6ba439 Parents: 5fd354b Author: Wenchen Fan <wenc...@databricks.com> Authored: Sun Sep 18 21:15:35 2016 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Sun Sep 18 21:50:05 2016 +0800 ---------------------------------------------------------------------- .../sql/catalyst/catalog/SessionCatalog.scala | 32 ++++++--- .../catalyst/catalog/SessionCatalogSuite.scala | 24 +++---- .../org/apache/spark/sql/DataFrameWriter.scala | 9 ++- .../command/createDataSourceTables.scala | 30 ++++---- .../apache/spark/sql/internal/CatalogImpl.scala | 6 +- .../org/apache/spark/sql/SQLQuerySuite.scala | 11 +++ .../spark/sql/internal/CatalogSuite.scala | 11 +++ .../sql/test/DataFrameReaderWriterSuite.scala | 76 ++++++++++++++++++++ .../sql/hive/MetastoreDataSourcesSuite.scala | 13 ++-- .../sql/sources/HadoopFsRelationTest.scala | 10 ++- 10 files changed, 172 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/cf728b0f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 2448513..ecb4dab 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -332,9 +332,9 @@ class SessionCatalog( new Path(new Path(dbLocation), formatTableName(tableIdent.table)).toString } - // ------------------------------------------------------------- - // | Methods that interact with temporary and metastore tables | - // ------------------------------------------------------------- + // ---------------------------------------------- + // | Methods that interact with temp views only | + // ---------------------------------------------- /** * Create a temporary table. @@ -351,6 +351,24 @@ class SessionCatalog( } /** + * Return a temporary view exactly as it was stored. + */ + def getTempView(name: String): Option[LogicalPlan] = synchronized { + tempTables.get(formatTableName(name)) + } + + /** + * Drop a temporary view. + */ + def dropTempView(name: String): Unit = synchronized { + tempTables.remove(formatTableName(name)) + } + + // ------------------------------------------------------------- + // | Methods that interact with temporary and metastore tables | + // ------------------------------------------------------------- + + /** * Rename a table. * * If a database is specified in `oldName`, this will rename the table in that database. @@ -506,14 +524,6 @@ class SessionCatalog( tempTables.clear() } - /** - * Return a temporary table exactly as it was stored. - * For testing only. - */ - private[catalog] def getTempTable(name: String): Option[LogicalPlan] = synchronized { - tempTables.get(formatTableName(name)) - } - // ---------------------------------------------------------------------------- // Partitions // ---------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/cf728b0f/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 67ca0aa..574ed05 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -201,16 +201,16 @@ class SessionCatalogSuite extends SparkFunSuite { val tempTable2 = Range(1, 20, 2, 10) catalog.createTempView("tbl1", tempTable1, overrideIfExists = false) catalog.createTempView("tbl2", tempTable2, overrideIfExists = false) - assert(catalog.getTempTable("tbl1") == Option(tempTable1)) - assert(catalog.getTempTable("tbl2") == Option(tempTable2)) - assert(catalog.getTempTable("tbl3").isEmpty) + assert(catalog.getTempView("tbl1") == Option(tempTable1)) + assert(catalog.getTempView("tbl2") == Option(tempTable2)) + assert(catalog.getTempView("tbl3").isEmpty) // Temporary table already exists intercept[TempTableAlreadyExistsException] { catalog.createTempView("tbl1", tempTable1, overrideIfExists = false) } // Temporary table already exists but we override it catalog.createTempView("tbl1", tempTable2, overrideIfExists = true) - assert(catalog.getTempTable("tbl1") == Option(tempTable2)) + assert(catalog.getTempView("tbl1") == Option(tempTable2)) } test("drop table") { @@ -246,11 +246,11 @@ class SessionCatalogSuite extends SparkFunSuite { val tempTable = Range(1, 10, 2, 10) sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false) sessionCatalog.setCurrentDatabase("db2") - assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable)) + assert(sessionCatalog.getTempView("tbl1") == Some(tempTable)) assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) // If database is not specified, temp table should be dropped first sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false) - assert(sessionCatalog.getTempTable("tbl1") == None) + assert(sessionCatalog.getTempView("tbl1") == None) assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) // If temp table does not exist, the table in the current database should be dropped sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false) @@ -259,7 +259,7 @@ class SessionCatalogSuite extends SparkFunSuite { sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false) sessionCatalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = false) sessionCatalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false) - assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable)) + assert(sessionCatalog.getTempView("tbl1") == Some(tempTable)) assert(externalCatalog.listTables("db2").toSet == Set("tbl2")) } @@ -307,18 +307,18 @@ class SessionCatalogSuite extends SparkFunSuite { val tempTable = Range(1, 10, 2, 10) sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false) sessionCatalog.setCurrentDatabase("db2") - assert(sessionCatalog.getTempTable("tbl1") == Option(tempTable)) + assert(sessionCatalog.getTempView("tbl1") == Option(tempTable)) assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) // If database is not specified, temp table should be renamed first sessionCatalog.renameTable(TableIdentifier("tbl1"), TableIdentifier("tbl3")) - assert(sessionCatalog.getTempTable("tbl1").isEmpty) - assert(sessionCatalog.getTempTable("tbl3") == Option(tempTable)) + assert(sessionCatalog.getTempView("tbl1").isEmpty) + assert(sessionCatalog.getTempView("tbl3") == Option(tempTable)) assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) // If database is specified, temp tables are never renamed sessionCatalog.renameTable( TableIdentifier("tbl2", Some("db2")), TableIdentifier("tbl4", Some("db2"))) - assert(sessionCatalog.getTempTable("tbl3") == Option(tempTable)) - assert(sessionCatalog.getTempTable("tbl4").isEmpty) + assert(sessionCatalog.getTempView("tbl3") == Option(tempTable)) + assert(sessionCatalog.getTempView("tbl4").isEmpty) assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl4")) } http://git-wip-us.apache.org/repos/asf/spark/blob/cf728b0f/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index a4c4a5d..b448b9e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -356,7 +356,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { private def saveAsTable(tableIdent: TableIdentifier): Unit = { - val tableExists = df.sparkSession.sessionState.catalog.tableExists(tableIdent) + val sessionState = df.sparkSession.sessionState + val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) + val tableIdentWithDB = tableIdent.copy(database = Some(db)) + // Pass a table identifier with database part, so that `tableExists` won't check temp views + // unexpectedly. + val tableExists = sessionState.catalog.tableExists(tableIdentWithDB) (tableExists, mode) match { case (true, SaveMode.Ignore) => @@ -375,7 +380,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { mode, extraOptions.toMap, df.logicalPlan) - df.sparkSession.sessionState.executePlan(cmd).toRdd + sessionState.executePlan(cmd).toRdd } } http://git-wip-us.apache.org/repos/asf/spark/blob/cf728b0f/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 900446c..06965ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -27,7 +27,6 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.internal.HiveSerDe @@ -74,14 +73,16 @@ case class CreateDataSourceTableCommand( s"characters, numbers and _.") } - val tableName = tableIdent.unquotedString val sessionState = sparkSession.sessionState - - if (sessionState.catalog.tableExists(tableIdent)) { + val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) + val tableIdentWithDB = tableIdent.copy(database = Some(db)) + // Pass a table identifier with database part, so that `tableExists` won't check temp views + // unexpectedly. + if (sessionState.catalog.tableExists(tableIdentWithDB)) { if (ignoreIfExists) { return Seq.empty[Row] } else { - throw new AnalysisException(s"Table $tableName already exists.") + throw new AnalysisException(s"Table ${tableIdentWithDB.quotedString} already exists.") } } @@ -157,8 +158,11 @@ case class CreateDataSourceTableAsSelectCommand( s"characters, numbers and _.") } - val tableName = tableIdent.unquotedString val sessionState = sparkSession.sessionState + val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) + val tableIdentWithDB = tableIdent.copy(database = Some(db)) + val tableName = tableIdentWithDB.unquotedString + var createMetastoreTable = false var isExternal = true val optionsWithPath = @@ -170,7 +174,9 @@ case class CreateDataSourceTableAsSelectCommand( } var existingSchema = Option.empty[StructType] - if (sparkSession.sessionState.catalog.tableExists(tableIdent)) { + // Pass a table identifier with database part, so that `tableExists` won't check temp views + // unexpectedly. + if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) { // Check if we need to throw an exception or just return. mode match { case SaveMode.ErrorIfExists => @@ -195,13 +201,13 @@ case class CreateDataSourceTableAsSelectCommand( // inserting into (i.e. using the same compression). EliminateSubqueryAliases( - sessionState.catalog.lookupRelation(tableIdent)) match { + sessionState.catalog.lookupRelation(tableIdentWithDB)) match { case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) => // check if the file formats match l.relation match { case r: HadoopFsRelation if r.fileFormat.getClass != dataSource.providingClass => throw new AnalysisException( - s"The file format of the existing table $tableIdent is " + + s"The file format of the existing table $tableName is " + s"`${r.fileFormat.getClass.getName}`. It doesn't match the specified " + s"format `$provider`") case _ => @@ -218,7 +224,7 @@ case class CreateDataSourceTableAsSelectCommand( throw new AnalysisException(s"Saving data in ${o.toString} is not supported.") } case SaveMode.Overwrite => - sparkSession.sql(s"DROP TABLE IF EXISTS $tableName") + sessionState.catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = true) // Need to create the table again. createMetastoreTable = true } @@ -246,7 +252,7 @@ case class CreateDataSourceTableAsSelectCommand( dataSource.write(mode, df) } catch { case ex: AnalysisException => - logError(s"Failed to write to table ${tableIdent.identifier} in $mode mode", ex) + logError(s"Failed to write to table $tableName in $mode mode", ex) throw ex } if (createMetastoreTable) { @@ -265,7 +271,7 @@ case class CreateDataSourceTableAsSelectCommand( } // Refresh the cache of the table in the catalog. - sessionState.catalog.refreshTable(tableIdent) + sessionState.catalog.refreshTable(tableIdentWithDB) Seq.empty[Row] } } http://git-wip-us.apache.org/repos/asf/spark/blob/cf728b0f/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index a6ae6fe..2067e7e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -296,8 +296,10 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { * @since 2.0.0 */ override def dropTempView(viewName: String): Unit = { - sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(viewName)) - sessionCatalog.dropTable(TableIdentifier(viewName), ignoreIfNotExists = true) + sparkSession.sessionState.catalog.getTempView(viewName).foreach { tempView => + sparkSession.sharedState.cacheManager.uncacheQuery(Dataset.ofRows(sparkSession, tempView)) + sessionCatalog.dropTempView(viewName) + } } /** http://git-wip-us.apache.org/repos/asf/spark/blob/cf728b0f/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index f20b9ad..cf25097 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2621,4 +2621,15 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { }.limit(1).queryExecution.toRdd.count() assert(numRecordsRead.value === 10) } + + test("CREATE TABLE USING should not fail if a same-name temp view exists") { + withTable("same_name") { + withTempView("same_name") { + spark.range(10).createTempView("same_name") + sql("CREATE TABLE same_name(i int) USING json") + checkAnswer(spark.table("same_name"), spark.range(10).toDF()) + assert(spark.table("default.same_name").collect().isEmpty) + } + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/cf728b0f/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index d75df56..e62ae38 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -304,6 +304,17 @@ class CatalogSuite columnFields.foreach { f => assert(columnString.contains(f.toString)) } } + test("dropTempView should not un-cache and drop metastore table if a same-name table exists") { + withTable("same_name") { + spark.range(10).write.saveAsTable("same_name") + sql("CACHE TABLE same_name") + assert(spark.catalog.isCached("default.same_name")) + spark.catalog.dropTempView("same_name") + assert(spark.sessionState.catalog.tableExists(TableIdentifier("same_name", Some("default")))) + assert(spark.catalog.isCached("default.same_name")) + } + } + // TODO: add tests for the rest of them } http://git-wip-us.apache.org/repos/asf/spark/blob/cf728b0f/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index e071aef..bba265e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -22,6 +22,7 @@ import java.io.File import org.scalatest.BeforeAndAfter import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.util.Utils @@ -439,4 +440,79 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be checkAnswer(df, spark.createDataset(expectedResult).toDF()) assert(df.schema === expectedSchema) } + + test("saveAsTable with mode Append should not fail if the table not exists " + + "but a same-name temp view exist") { + withTable("same_name") { + withTempView("same_name") { + spark.range(10).createTempView("same_name") + spark.range(20).write.mode(SaveMode.Append).saveAsTable("same_name") + assert( + spark.sessionState.catalog.tableExists(TableIdentifier("same_name", Some("default")))) + } + } + } + + test("saveAsTable with mode Append should not fail if the table already exists " + + "and a same-name temp view exist") { + withTable("same_name") { + withTempView("same_name") { + sql("CREATE TABLE same_name(id LONG) USING parquet") + spark.range(10).createTempView("same_name") + spark.range(20).write.mode(SaveMode.Append).saveAsTable("same_name") + checkAnswer(spark.table("same_name"), spark.range(10).toDF()) + checkAnswer(spark.table("default.same_name"), spark.range(20).toDF()) + } + } + } + + test("saveAsTable with mode ErrorIfExists should not fail if the table not exists " + + "but a same-name temp view exist") { + withTable("same_name") { + withTempView("same_name") { + spark.range(10).createTempView("same_name") + spark.range(20).write.mode(SaveMode.ErrorIfExists).saveAsTable("same_name") + assert( + spark.sessionState.catalog.tableExists(TableIdentifier("same_name", Some("default")))) + } + } + } + + test("saveAsTable with mode Overwrite should not drop the temp view if the table not exists " + + "but a same-name temp view exist") { + withTable("same_name") { + withTempView("same_name") { + spark.range(10).createTempView("same_name") + spark.range(20).write.mode(SaveMode.Overwrite).saveAsTable("same_name") + assert(spark.sessionState.catalog.getTempView("same_name").isDefined) + assert( + spark.sessionState.catalog.tableExists(TableIdentifier("same_name", Some("default")))) + } + } + } + + test("saveAsTable with mode Overwrite should not fail if the table already exists " + + "and a same-name temp view exist") { + withTable("same_name") { + withTempView("same_name") { + sql("CREATE TABLE same_name(id LONG) USING parquet") + spark.range(10).createTempView("same_name") + spark.range(20).write.mode(SaveMode.Overwrite).saveAsTable("same_name") + checkAnswer(spark.table("same_name"), spark.range(10).toDF()) + checkAnswer(spark.table("default.same_name"), spark.range(20).toDF()) + } + } + } + + test("saveAsTable with mode Ignore should create the table if the table not exists " + + "but a same-name temp view exist") { + withTable("same_name") { + withTempView("same_name") { + spark.range(10).createTempView("same_name") + spark.range(20).write.mode(SaveMode.Ignore).saveAsTable("same_name") + assert( + spark.sessionState.catalog.tableExists(TableIdentifier("same_name", Some("default")))) + } + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/cf728b0f/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index a0b3b37..b758ab0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -333,7 +333,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv }.getMessage assert( - message.contains("Table ctasJsonTable already exists."), + message.contains("Table default.ctasJsonTable already exists."), "We should complain that ctasJsonTable already exists") // The following statement should be fine if it has IF NOT EXISTS. @@ -509,7 +509,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv assert( intercept[AnalysisException] { sparkSession.catalog.createExternalTable("createdJsonTable", jsonFilePath.toString) - }.getMessage.contains("Table createdJsonTable already exists."), + }.getMessage.contains("Table default.createdJsonTable already exists."), "We should complain that createdJsonTable already exists") } @@ -901,7 +901,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv val e = intercept[AnalysisException] { createDF(10, 19).write.mode(SaveMode.Append).format("orc").saveAsTable("appendOrcToParquet") } - assert(e.getMessage.contains("The file format of the existing table `appendOrcToParquet` " + + assert(e.getMessage.contains( + "The file format of the existing table default.appendOrcToParquet " + "is `org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat`. " + "It doesn't match the specified format `orc`")) } @@ -912,7 +913,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv createDF(10, 19).write.mode(SaveMode.Append).format("parquet") .saveAsTable("appendParquetToJson") } - assert(e.getMessage.contains("The file format of the existing table `appendParquetToJson` " + + assert(e.getMessage.contains( + "The file format of the existing table default.appendParquetToJson " + "is `org.apache.spark.sql.execution.datasources.json.JsonFileFormat`. " + "It doesn't match the specified format `parquet`")) } @@ -923,7 +925,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv createDF(10, 19).write.mode(SaveMode.Append).format("text") .saveAsTable("appendTextToJson") } - assert(e.getMessage.contains("The file format of the existing table `appendTextToJson` is " + + assert(e.getMessage.contains( + "The file format of the existing table default.appendTextToJson is " + "`org.apache.spark.sql.execution.datasources.json.JsonFileFormat`. " + "It doesn't match the specified format `text`")) } http://git-wip-us.apache.org/repos/asf/spark/blob/cf728b0f/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala index 047b08c..97f2b23 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala @@ -337,9 +337,8 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes } test("saveAsTable()/load() - non-partitioned table - ErrorIfExists") { - Seq.empty[(Int, String)].toDF().createOrReplaceTempView("t") - - withTempView("t") { + withTable("t") { + sql("CREATE TABLE t(i INT) USING parquet") intercept[AnalysisException] { testDF.write.format(dataSourceName).mode(SaveMode.ErrorIfExists).saveAsTable("t") } @@ -347,9 +346,8 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes } test("saveAsTable()/load() - non-partitioned table - Ignore") { - Seq.empty[(Int, String)].toDF().createOrReplaceTempView("t") - - withTempView("t") { + withTable("t") { + sql("CREATE TABLE t(i INT) USING parquet") testDF.write.format(dataSourceName).mode(SaveMode.Ignore).saveAsTable("t") assert(spark.table("t").collect().isEmpty) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org