Repository: spark Updated Branches: refs/heads/master ed0b4070f -> 5c6b08557
[SPARK-14603][SQL] Verification of Metadata Operations by Session Catalog Since we cannot really trust if the underlying external catalog can throw exceptions when there is an invalid metadata operation, let's do it in SessionCatalog. - [X] The first step is to unify the error messages issued in Hive-specific Session Catalog and general Session Catalog. - [X] The second step is to verify the inputs of metadata operations for partitioning-related operations. This is moved to a separate PR: https://github.com/apache/spark/pull/12801 - [X] The third step is to add database existence verification in `SessionCatalog` - [X] The fourth step is to add table existence verification in `SessionCatalog` - [X] The fifth step is to add function existence verification in `SessionCatalog` Add test cases and verify the error messages we issued Author: gatorsmile <gatorsm...@gmail.com> Author: xiaoli <lixiao1...@gmail.com> Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local> Closes #12385 from gatorsmile/verifySessionAPIs. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c6b0855 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c6b0855 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c6b0855 Branch: refs/heads/master Commit: 5c6b0855787c080d3e233eb09c05c025395e7cb3 Parents: ed0b407 Author: gatorsmile <gatorsm...@gmail.com> Authored: Tue May 10 11:25:39 2016 -0700 Committer: Andrew Or <and...@databricks.com> Committed: Tue May 10 11:25:55 2016 -0700 ---------------------------------------------------------------------- python/pyspark/sql/utils.py | 2 + .../analysis/AlreadyExistException.scala | 49 ++++++ .../catalyst/analysis/NoSuchItemException.scala | 8 + .../sql/catalyst/catalog/ExternalCatalog.scala | 6 +- .../sql/catalyst/catalog/InMemoryCatalog.scala | 37 ++--- .../sql/catalyst/catalog/SessionCatalog.scala | 84 +++++++++-- .../catalyst/catalog/SessionCatalogSuite.scala | 148 ++++++++++++------- .../spark/sql/execution/command/DDLSuite.scala | 14 +- .../sql/hive/execution/HiveCommandSuite.scala | 6 +- .../sql/hive/execution/SQLQuerySuite.scala | 30 ++-- 10 files changed, 261 insertions(+), 123 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5c6b0855/python/pyspark/sql/utils.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py index cb172d2..36c9322 100644 --- a/python/pyspark/sql/utils.py +++ b/python/pyspark/sql/utils.py @@ -61,6 +61,8 @@ def capture_sql_exception(f): e.java_exception.getStackTrace())) if s.startswith('org.apache.spark.sql.AnalysisException: '): raise AnalysisException(s.split(': ', 1)[1], stackTrace) + if s.startswith('org.apache.spark.sql.catalyst.analysis.NoSuchTableException: '): + raise AnalysisException(s.split(': ', 1)[1], stackTrace) if s.startswith('org.apache.spark.sql.catalyst.parser.ParseException: '): raise ParseException(s.split(': ', 1)[1], stackTrace) if s.startswith('org.apache.spark.sql.ContinuousQueryException: '): http://git-wip-us.apache.org/repos/asf/spark/blob/5c6b0855/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala new file mode 100644 index 0000000..ec56fe7 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AlreadyExistException.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec + +/** + * Thrown by a catalog when an item already exists. The analyzer will rethrow the exception + * as an [[org.apache.spark.sql.AnalysisException]] with the correct position information. + */ +class DatabaseAlreadyExistsException(db: String) + extends AnalysisException(s"Database '$db' already exists") + +class TableAlreadyExistsException(db: String, table: String) + extends AnalysisException(s"Table or view '$table' already exists in database '$db'") + +class TempTableAlreadyExistsException(table: String) + extends AnalysisException(s"Temporary table '$table' already exists") + +class PartitionAlreadyExistsException(db: String, table: String, spec: TablePartitionSpec) + extends AnalysisException( + s"Partition already exists in table '$table' database '$db':\n" + spec.mkString("\n")) + +class PartitionsAlreadyExistException(db: String, table: String, specs: Seq[TablePartitionSpec]) + extends AnalysisException( + s"The following partitions already exists in table '$table' database '$db':\n" + + specs.mkString("\n===\n")) + +class FunctionAlreadyExistsException(db: String, func: String) + extends AnalysisException(s"Function '$func' already exists in database '$db'") + +class TempFunctionAlreadyExistsException(func: String) + extends AnalysisException(s"Temporary function '$func' already exists") http://git-wip-us.apache.org/repos/asf/spark/blob/5c6b0855/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala index ff13bce..8febdca 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchItemException.scala @@ -44,3 +44,11 @@ class NoSuchFunctionException(db: String, func: String) extends AnalysisException( s"Undefined function: '$func'. This function is neither a registered temporary function nor " + s"a permanent function registered in the database '$db'.") + +class NoSuchPartitionsException(db: String, table: String, specs: Seq[TablePartitionSpec]) + extends AnalysisException( + s"The following partitions not found in table '$table' database '$db':\n" + + specs.mkString("\n===\n")) + +class NoSuchTempFunctionException(func: String) + extends AnalysisException(s"Temporary function '$func' not found") http://git-wip-us.apache.org/repos/asf/spark/blob/5c6b0855/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 178ae6d..81974b2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.catalog -import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException /** @@ -27,14 +27,14 @@ import org.apache.spark.sql.AnalysisException * can be accessed in multiple threads. This is an external catalog because it is expected to * interact with external systems. * - * Implementations should throw [[AnalysisException]] when table or database don't exist. + * Implementations should throw [[NoSuchDatabaseException]] when table or database don't exist. */ abstract class ExternalCatalog { import CatalogTypes.TablePartitionSpec protected def requireDbExists(db: String): Unit = { if (!databaseExists(db)) { - throw new AnalysisException(s"Database '$db' does not exist") + throw new NoSuchDatabaseException(db) } } http://git-wip-us.apache.org/repos/asf/spark/blob/5c6b0855/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 982b035..21da55c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.SparkException import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.util.StringUtils /** @@ -60,29 +61,25 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E private def requireFunctionExists(db: String, funcName: String): Unit = { if (!functionExists(db, funcName)) { - throw new AnalysisException( - s"Function not found: '$funcName' does not exist in database '$db'") + throw new NoSuchFunctionException(db = db, func = funcName) } } private def requireFunctionNotExists(db: String, funcName: String): Unit = { if (functionExists(db, funcName)) { - throw new AnalysisException( - s"Function already exists: '$funcName' exists in database '$db'") + throw new FunctionAlreadyExistsException(db = db, func = funcName) } } private def requireTableExists(db: String, table: String): Unit = { if (!tableExists(db, table)) { - throw new AnalysisException( - s"Table or view not found: '$table' does not exist in database '$db'") + throw new NoSuchTableException(db = db, table = table) } } private def requireTableNotExists(db: String, table: String): Unit = { if (tableExists(db, table)) { - throw new AnalysisException( - s"Table or view exists: '$table' exists in database '$db'") + throw new TableAlreadyExistsException(db = db, table = table) } } @@ -92,8 +89,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E specs: Seq[TablePartitionSpec]): Unit = { specs foreach { s => if (!partitionExists(db, table, s)) { - throw new AnalysisException( - s"Partition not found: database '$db' table '$table' does not contain: '$s'") + throw new NoSuchPartitionException(db = db, table = table, spec = s) } } } @@ -104,8 +100,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E specs: Seq[TablePartitionSpec]): Unit = { specs foreach { s => if (partitionExists(db, table, s)) { - throw new AnalysisException( - s"Partition exists: database '$db' table '$table' already contains: '$s'") + throw new PartitionAlreadyExistsException(db = db, table = table, spec = s) } } } @@ -121,7 +116,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E ignoreIfExists: Boolean): Unit = synchronized { if (catalog.contains(dbDefinition.name)) { if (!ignoreIfExists) { - throw new AnalysisException(s"Database '${dbDefinition.name}' already exists.") + throw new DatabaseAlreadyExistsException(dbDefinition.name) } } else { try { @@ -161,7 +156,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E catalog.remove(db) } else { if (!ignoreIfNotExists) { - throw new AnalysisException(s"Database '$db' does not exist") + throw new NoSuchDatabaseException(db) } } } @@ -202,7 +197,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E val table = tableDefinition.identifier.table if (tableExists(db, table)) { if (!ignoreIfExists) { - throw new AnalysisException(s"Table '$table' already exists in database '$db'") + throw new TableAlreadyExistsException(db = db, table = table) } } else { if (tableDefinition.tableType == CatalogTableType.MANAGED) { @@ -238,7 +233,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E catalog(db).tables.remove(table) } else { if (!ignoreIfNotExists) { - throw new AnalysisException(s"Table or view '$table' does not exist in database '$db'") + throw new NoSuchTableException(db = db, table = table) } } } @@ -328,9 +323,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E if (!ignoreIfExists) { val dupSpecs = parts.collect { case p if existingParts.contains(p.spec) => p.spec } if (dupSpecs.nonEmpty) { - val dupSpecsStr = dupSpecs.mkString("\n===\n") - throw new AnalysisException("The following partitions already exist in database " + - s"'$db' table '$table':\n$dupSpecsStr") + throw new PartitionsAlreadyExistException(db = db, table = table, specs = dupSpecs) } } @@ -365,9 +358,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E if (!ignoreIfNotExists) { val missingSpecs = partSpecs.collect { case s if !existingParts.contains(s) => s } if (missingSpecs.nonEmpty) { - val missingSpecsStr = missingSpecs.mkString("\n===\n") - throw new AnalysisException("The following partitions do not exist in database " + - s"'$db' table '$table':\n$missingSpecsStr") + throw new NoSuchPartitionsException(db = db, table = table, specs = missingSpecs) } } @@ -467,7 +458,7 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E override def createFunction(db: String, func: CatalogFunction): Unit = synchronized { requireDbExists(db) if (functionExists(db, func.identifier.funcName)) { - throw new AnalysisException(s"Function '$func' already exists in '$db' database") + throw new FunctionAlreadyExistsException(db = db, func = func.identifier.funcName) } else { catalog(db).functions.put(func.identifier.funcName, func) } http://git-wip-us.apache.org/repos/asf/spark/blob/5c6b0855/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index b267798..7505e2c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchFunctionException, NoSuchPermanentFunctionException, SimpleFunctionRegistry} +import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} @@ -111,6 +111,25 @@ class SessionCatalog( fs.makeQualified(hadoopPath) } + protected[this] def requireDbExists(db: String): Unit = { + if (!databaseExists(db)) { + throw new NoSuchDatabaseException(db) + } + } + + protected[this] def requireTableExists(name: TableIdentifier): Unit = { + if (!tableExists(name)) { + val db = name.database.getOrElse(currentDb) + throw new NoSuchTableException(db = db, table = name.table) + } + } + + private def requireTableNotExists(name: TableIdentifier): Unit = { + if (tableExists(name)) { + val db = name.database.getOrElse(currentDb) + throw new TableAlreadyExistsException(db = db, table = name.table) + } + } // ---------------------------------------------------------------------------- // Databases // ---------------------------------------------------------------------------- @@ -135,11 +154,13 @@ class SessionCatalog( def alterDatabase(dbDefinition: CatalogDatabase): Unit = { val dbName = formatDatabaseName(dbDefinition.name) + requireDbExists(dbName) externalCatalog.alterDatabase(dbDefinition.copy(name = dbName)) } def getDatabaseMetadata(db: String): CatalogDatabase = { val dbName = formatDatabaseName(db) + requireDbExists(dbName) externalCatalog.getDatabase(dbName) } @@ -160,9 +181,7 @@ class SessionCatalog( def setCurrentDatabase(db: String): Unit = { val dbName = formatDatabaseName(db) - if (!databaseExists(dbName)) { - throw new AnalysisException(s"Database '$dbName' does not exist.") - } + requireDbExists(dbName) synchronized { currentDb = dbName } } @@ -196,6 +215,7 @@ class SessionCatalog( val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableDefinition.identifier.table) val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db))) + requireDbExists(db) externalCatalog.createTable(db, newTableDefinition, ignoreIfExists) } @@ -211,18 +231,23 @@ class SessionCatalog( def alterTable(tableDefinition: CatalogTable): Unit = { val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableDefinition.identifier.table) - val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db))) + val tableIdentifier = TableIdentifier(table, Some(db)) + val newTableDefinition = tableDefinition.copy(identifier = tableIdentifier) + requireDbExists(db) + requireTableExists(tableIdentifier) externalCatalog.alterTable(db, newTableDefinition) } /** * Retrieve the metadata of an existing metastore table. * If no database is specified, assume the table is in the current database. - * If the specified table is not found in the database then an [[AnalysisException]] is thrown. + * If the specified table is not found in the database then an [[NoSuchTableException]] is thrown. */ def getTableMetadata(name: TableIdentifier): CatalogTable = { val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) val table = formatTableName(name.table) + requireDbExists(db) + requireTableExists(TableIdentifier(table, Some(db))) externalCatalog.getTable(db, table) } @@ -234,13 +259,14 @@ class SessionCatalog( def getTableMetadataOption(name: TableIdentifier): Option[CatalogTable] = { val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) val table = formatTableName(name.table) + requireDbExists(db) externalCatalog.getTableOption(db, table) } /** * Load files stored in given path into an existing metastore table. * If no database is specified, assume the table is in the current database. - * If the specified table is not found in the database then an [[AnalysisException]] is thrown. + * If the specified table is not found in the database then an [[NoSuchTableException]] is thrown. */ def loadTable( name: TableIdentifier, @@ -249,13 +275,15 @@ class SessionCatalog( holdDDLTime: Boolean): Unit = { val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) val table = formatTableName(name.table) + requireDbExists(db) + requireTableExists(TableIdentifier(table, Some(db))) externalCatalog.loadTable(db, table, loadPath, isOverwrite, holdDDLTime) } /** * Load files stored in given path into the partition of an existing metastore table. * If no database is specified, assume the table is in the current database. - * If the specified table is not found in the database then an [[AnalysisException]] is thrown. + * If the specified table is not found in the database then an [[NoSuchTableException]] is thrown. */ def loadPartition( name: TableIdentifier, @@ -267,6 +295,8 @@ class SessionCatalog( isSkewedStoreAsSubdir: Boolean): Unit = { val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) val table = formatTableName(name.table) + requireDbExists(db) + requireTableExists(TableIdentifier(table, Some(db))) externalCatalog.loadPartition(db, table, loadPath, partition, isOverwrite, holdDDLTime, inheritTableSpecs, isSkewedStoreAsSubdir) } @@ -291,7 +321,7 @@ class SessionCatalog( overrideIfExists: Boolean): Unit = synchronized { val table = formatTableName(name) if (tempTables.contains(table) && !overrideIfExists) { - throw new AnalysisException(s"Temporary table '$name' already exists.") + throw new TempTableAlreadyExistsException(name) } tempTables.put(table, tableDefinition) } @@ -307,6 +337,7 @@ class SessionCatalog( */ def renameTable(oldName: TableIdentifier, newName: TableIdentifier): Unit = synchronized { val db = formatDatabaseName(oldName.database.getOrElse(currentDb)) + requireDbExists(db) val newDb = formatDatabaseName(newName.database.getOrElse(currentDb)) if (db != newDb) { throw new AnalysisException( @@ -315,6 +346,8 @@ class SessionCatalog( val oldTableName = formatTableName(oldName.table) val newTableName = formatTableName(newName.table) if (oldName.database.isDefined || !tempTables.contains(oldTableName)) { + requireTableExists(TableIdentifier(oldTableName, Some(db))) + requireTableNotExists(TableIdentifier(newTableName, Some(db))) externalCatalog.renameTable(db, oldTableName, newTableName) } else { if (newName.database.isDefined) { @@ -343,12 +376,13 @@ class SessionCatalog( val db = formatDatabaseName(name.database.getOrElse(currentDb)) val table = formatTableName(name.table) if (name.database.isDefined || !tempTables.contains(table)) { + requireDbExists(db) // When ignoreIfNotExists is false, no exception is issued when the table does not exist. // Instead, log it as an error message. - if (externalCatalog.tableExists(db, table)) { + if (tableExists(TableIdentifier(table, Option(db)))) { externalCatalog.dropTable(db, table, ignoreIfNotExists = true) } else if (!ignoreIfNotExists) { - throw new AnalysisException(s"Table or view '${name.quotedString}' does not exist") + throw new NoSuchTableException(db = db, table = table) } } else { tempTables.remove(table) @@ -418,6 +452,7 @@ class SessionCatalog( */ def listTables(db: String, pattern: String): Seq[TableIdentifier] = { val dbName = formatDatabaseName(db) + requireDbExists(dbName) val dbTables = externalCatalog.listTables(dbName, pattern).map { t => TableIdentifier(t, Some(dbName)) } synchronized { @@ -477,6 +512,8 @@ class SessionCatalog( ignoreIfExists: Boolean): Unit = { val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) + requireDbExists(db) + requireTableExists(TableIdentifier(table, Option(db))) externalCatalog.createPartitions(db, table, parts, ignoreIfExists) } @@ -490,6 +527,8 @@ class SessionCatalog( ignoreIfNotExists: Boolean): Unit = { val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) + requireDbExists(db) + requireTableExists(TableIdentifier(table, Option(db))) externalCatalog.dropPartitions(db, table, parts, ignoreIfNotExists) } @@ -505,6 +544,8 @@ class SessionCatalog( newSpecs: Seq[TablePartitionSpec]): Unit = { val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) + requireDbExists(db) + requireTableExists(TableIdentifier(table, Option(db))) externalCatalog.renamePartitions(db, table, specs, newSpecs) } @@ -520,6 +561,8 @@ class SessionCatalog( def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = { val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) + requireDbExists(db) + requireTableExists(TableIdentifier(table, Option(db))) externalCatalog.alterPartitions(db, table, parts) } @@ -530,6 +573,8 @@ class SessionCatalog( def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition = { val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) + requireDbExists(db) + requireTableExists(TableIdentifier(table, Option(db))) externalCatalog.getPartition(db, table, spec) } @@ -545,6 +590,8 @@ class SessionCatalog( partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = { val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableName.table) + requireDbExists(db) + requireTableExists(TableIdentifier(table, Option(db))) externalCatalog.listPartitions(db, table, partialSpec) } @@ -567,12 +614,13 @@ class SessionCatalog( */ def createFunction(funcDefinition: CatalogFunction, ignoreIfExists: Boolean): Unit = { val db = formatDatabaseName(funcDefinition.identifier.database.getOrElse(getCurrentDatabase)) + requireDbExists(db) val identifier = FunctionIdentifier(funcDefinition.identifier.funcName, Some(db)) val newFuncDefinition = funcDefinition.copy(identifier = identifier) if (!functionExists(identifier)) { externalCatalog.createFunction(db, newFuncDefinition) } else if (!ignoreIfExists) { - throw new AnalysisException(s"Function '$identifier' already exists in database '$db'") + throw new FunctionAlreadyExistsException(db = db, func = identifier.toString) } } @@ -582,6 +630,7 @@ class SessionCatalog( */ def dropFunction(name: FunctionIdentifier, ignoreIfNotExists: Boolean): Unit = { val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) + requireDbExists(db) val identifier = name.copy(database = Some(db)) if (functionExists(identifier)) { // TODO: registry should just take in FunctionIdentifier for type safety @@ -594,7 +643,7 @@ class SessionCatalog( } externalCatalog.dropFunction(db, name.funcName) } else if (!ignoreIfNotExists) { - throw new AnalysisException(s"function '$identifier' does not exist in database '$db'") + throw new NoSuchFunctionException(db = db, func = identifier.toString) } } @@ -606,6 +655,7 @@ class SessionCatalog( */ def getFunctionMetadata(name: FunctionIdentifier): CatalogFunction = { val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) + requireDbExists(db) externalCatalog.getFunction(db, name.funcName) } @@ -614,6 +664,7 @@ class SessionCatalog( */ def functionExists(name: FunctionIdentifier): Boolean = { val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase)) + requireDbExists(db) functionRegistry.functionExists(name.unquotedString) || externalCatalog.functionExists(db, name.funcName) } @@ -654,7 +705,7 @@ class SessionCatalog( funcDefinition: FunctionBuilder, ignoreIfExists: Boolean): Unit = { if (functionRegistry.lookupFunctionBuilder(name).isDefined && !ignoreIfExists) { - throw new AnalysisException(s"Temporary function '$name' already exists.") + throw new TempFunctionAlreadyExistsException(name) } functionRegistry.registerFunction(name, info, funcDefinition) } @@ -664,8 +715,7 @@ class SessionCatalog( */ def dropTempFunction(name: String, ignoreIfNotExists: Boolean): Unit = { if (!functionRegistry.dropFunction(name) && !ignoreIfNotExists) { - throw new AnalysisException( - s"Temporary function '$name' cannot be dropped because it does not exist!") + throw new NoSuchTempFunctionException(name) } } @@ -684,6 +734,7 @@ class SessionCatalog( .orElse(functionRegistry.lookupFunction(qualifiedName.unquotedString)) .getOrElse { val db = qualifiedName.database.get + requireDbExists(db) if (externalCatalog.functionExists(db, name.funcName)) { val metadata = externalCatalog.getFunction(db, name.funcName) new ExpressionInfo(metadata.className, qualifiedName.unquotedString) @@ -760,6 +811,7 @@ class SessionCatalog( */ def listFunctions(db: String, pattern: String): Seq[FunctionIdentifier] = { val dbName = formatDatabaseName(db) + requireDbExists(dbName) val dbFunctions = externalCatalog.listFunctions(dbName, pattern) .map { f => FunctionIdentifier(f, Some(dbName)) } val loadedFunctions = StringUtils.filterPattern(functionRegistry.listFunction(), pattern) http://git-wip-us.apache.org/repos/asf/spark/blob/5c6b0855/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index a704ca7..f2d2e99 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.catalog import org.apache.spark.SparkFunSuite import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo, Literal} import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias} @@ -69,7 +70,7 @@ class SessionCatalogSuite extends SparkFunSuite { test("get database should throw exception when the database does not exist") { val catalog = new SessionCatalog(newBasicCatalog()) - intercept[AnalysisException] { + intercept[NoSuchDatabaseException] { catalog.getDatabaseMetadata("db_that_does_not_exist") } } @@ -120,7 +121,7 @@ class SessionCatalogSuite extends SparkFunSuite { test("drop database when the database does not exist") { val catalog = new SessionCatalog(newBasicCatalog()) - intercept[AnalysisException] { + intercept[NoSuchDatabaseException] { catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = false, cascade = false) } catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = true, cascade = false) @@ -140,8 +141,8 @@ class SessionCatalogSuite extends SparkFunSuite { test("alter database should throw exception when the database does not exist") { val catalog = new SessionCatalog(newBasicCatalog()) - intercept[AnalysisException] { - catalog.alterDatabase(newDb("does_not_exist")) + intercept[NoSuchDatabaseException] { + catalog.alterDatabase(newDb("unknown_db")) } } @@ -150,7 +151,7 @@ class SessionCatalogSuite extends SparkFunSuite { assert(catalog.getCurrentDatabase == "default") catalog.setCurrentDatabase("db2") assert(catalog.getCurrentDatabase == "db2") - intercept[AnalysisException] { + intercept[NoSuchDatabaseException] { catalog.setCurrentDatabase("deebo") } catalog.createDatabase(newDb("deebo"), ignoreIfExists = false) @@ -181,14 +182,14 @@ class SessionCatalogSuite extends SparkFunSuite { test("create table when database does not exist") { val catalog = new SessionCatalog(newBasicCatalog()) // Creating table in non-existent database should always fail - intercept[AnalysisException] { + intercept[NoSuchDatabaseException] { catalog.createTable(newTable("tbl1", "does_not_exist"), ignoreIfExists = false) } - intercept[AnalysisException] { + intercept[NoSuchDatabaseException] { catalog.createTable(newTable("tbl1", "does_not_exist"), ignoreIfExists = true) } // Table already exists - intercept[AnalysisException] { + intercept[TableAlreadyExistsException] { catalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = false) } catalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = true) @@ -200,16 +201,16 @@ class SessionCatalogSuite extends SparkFunSuite { val tempTable2 = Range(1, 20, 2, 10, Seq()) catalog.createTempTable("tbl1", tempTable1, overrideIfExists = false) catalog.createTempTable("tbl2", tempTable2, overrideIfExists = false) - assert(catalog.getTempTable("tbl1") == Some(tempTable1)) - assert(catalog.getTempTable("tbl2") == Some(tempTable2)) - assert(catalog.getTempTable("tbl3") == None) + assert(catalog.getTempTable("tbl1") == Option(tempTable1)) + assert(catalog.getTempTable("tbl2") == Option(tempTable2)) + assert(catalog.getTempTable("tbl3").isEmpty) // Temporary table already exists - intercept[AnalysisException] { + intercept[TempTableAlreadyExistsException] { catalog.createTempTable("tbl1", tempTable1, overrideIfExists = false) } // Temporary table already exists but we override it catalog.createTempTable("tbl1", tempTable2, overrideIfExists = true) - assert(catalog.getTempTable("tbl1") == Some(tempTable2)) + assert(catalog.getTempTable("tbl1") == Option(tempTable2)) } test("drop table") { @@ -227,13 +228,13 @@ class SessionCatalogSuite extends SparkFunSuite { test("drop table when database/table does not exist") { val catalog = new SessionCatalog(newBasicCatalog()) // Should always throw exception when the database does not exist - intercept[AnalysisException] { + intercept[NoSuchDatabaseException] { catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = false) } - intercept[AnalysisException] { + intercept[NoSuchDatabaseException] { catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = true) } - intercept[AnalysisException] { + intercept[NoSuchTableException] { catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = false) } catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = true) @@ -281,15 +282,20 @@ class SessionCatalogSuite extends SparkFunSuite { sessionCatalog.renameTable( TableIdentifier("tblone", Some("db2")), TableIdentifier("tblones", Some("db1"))) } + // The new table already exists + intercept[TableAlreadyExistsException] { + sessionCatalog.renameTable( + TableIdentifier("tblone", Some("db2")), TableIdentifier("table_two", Some("db2"))) + } } test("rename table when database/table does not exist") { val catalog = new SessionCatalog(newBasicCatalog()) - intercept[AnalysisException] { + intercept[NoSuchDatabaseException] { catalog.renameTable( TableIdentifier("tbl1", Some("unknown_db")), TableIdentifier("tbl2", Some("unknown_db"))) } - intercept[AnalysisException] { + intercept[NoSuchTableException] { catalog.renameTable( TableIdentifier("unknown_table", Some("db2")), TableIdentifier("tbl2", Some("db2"))) } @@ -301,18 +307,18 @@ class SessionCatalogSuite extends SparkFunSuite { val tempTable = Range(1, 10, 2, 10, Seq()) sessionCatalog.createTempTable("tbl1", tempTable, overrideIfExists = false) sessionCatalog.setCurrentDatabase("db2") - assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable)) + assert(sessionCatalog.getTempTable("tbl1") == Option(tempTable)) assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) // If database is not specified, temp table should be renamed first sessionCatalog.renameTable(TableIdentifier("tbl1"), TableIdentifier("tbl3")) - assert(sessionCatalog.getTempTable("tbl1") == None) - assert(sessionCatalog.getTempTable("tbl3") == Some(tempTable)) + assert(sessionCatalog.getTempTable("tbl1").isEmpty) + assert(sessionCatalog.getTempTable("tbl3") == Option(tempTable)) assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) // If database is specified, temp tables are never renamed sessionCatalog.renameTable( TableIdentifier("tbl2", Some("db2")), TableIdentifier("tbl4", Some("db2"))) - assert(sessionCatalog.getTempTable("tbl3") == Some(tempTable)) - assert(sessionCatalog.getTempTable("tbl4") == None) + assert(sessionCatalog.getTempTable("tbl3") == Option(tempTable)) + assert(sessionCatalog.getTempTable("tbl4").isEmpty) assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl4")) } @@ -334,10 +340,10 @@ class SessionCatalogSuite extends SparkFunSuite { test("alter table when database/table does not exist") { val catalog = new SessionCatalog(newBasicCatalog()) - intercept[AnalysisException] { + intercept[NoSuchDatabaseException] { catalog.alterTable(newTable("tbl1", "unknown_db")) } - intercept[AnalysisException] { + intercept[NoSuchTableException] { catalog.alterTable(newTable("unknown_table", "db2")) } } @@ -355,14 +361,25 @@ class SessionCatalogSuite extends SparkFunSuite { test("get table when database/table does not exist") { val catalog = new SessionCatalog(newBasicCatalog()) - intercept[AnalysisException] { + intercept[NoSuchDatabaseException] { catalog.getTableMetadata(TableIdentifier("tbl1", Some("unknown_db"))) } - intercept[AnalysisException] { + intercept[NoSuchTableException] { catalog.getTableMetadata(TableIdentifier("unknown_table", Some("db2"))) } } + test("get option of table metadata") { + val externalCatalog = newBasicCatalog() + val catalog = new SessionCatalog(externalCatalog) + assert(catalog.getTableMetadataOption(TableIdentifier("tbl1", Some("db2"))) + == Option(externalCatalog.getTable("db2", "tbl1"))) + assert(catalog.getTableMetadataOption(TableIdentifier("unknown_table", Some("db2"))).isEmpty) + intercept[NoSuchDatabaseException] { + catalog.getTableMetadataOption(TableIdentifier("tbl1", Some("unknown_db"))) + } + } + test("lookup table relation") { val externalCatalog = newBasicCatalog() val sessionCatalog = new SessionCatalog(externalCatalog) @@ -427,7 +444,7 @@ class SessionCatalogSuite extends SparkFunSuite { TableIdentifier("tbl4"), TableIdentifier("tbl1", Some("db2")), TableIdentifier("tbl2", Some("db2")))) - intercept[AnalysisException] { + intercept[NoSuchDatabaseException] { catalog.listTables("unknown_db") } } @@ -446,7 +463,7 @@ class SessionCatalogSuite extends SparkFunSuite { TableIdentifier("tbl2", Some("db2")))) assert(catalog.listTables("db2", "*1").toSet == Set(TableIdentifier("tbl1"), TableIdentifier("tbl1", Some("db2")))) - intercept[AnalysisException] { + intercept[NoSuchDatabaseException] { catalog.listTables("unknown_db", "*") } } @@ -471,11 +488,11 @@ class SessionCatalogSuite extends SparkFunSuite { test("create partitions when database/table does not exist") { val catalog = new SessionCatalog(newBasicCatalog()) - intercept[AnalysisException] { + intercept[NoSuchDatabaseException] { catalog.createPartitions( - TableIdentifier("tbl1", Some("does_not_exist")), Seq(), ignoreIfExists = false) + TableIdentifier("tbl1", Some("unknown_db")), Seq(), ignoreIfExists = false) } - intercept[AnalysisException] { + intercept[NoSuchTableException] { catalog.createPartitions( TableIdentifier("does_not_exist", Some("db2")), Seq(), ignoreIfExists = false) } @@ -520,13 +537,13 @@ class SessionCatalogSuite extends SparkFunSuite { test("drop partitions when database/table does not exist") { val catalog = new SessionCatalog(newBasicCatalog()) - intercept[AnalysisException] { + intercept[NoSuchDatabaseException] { catalog.dropPartitions( - TableIdentifier("tbl1", Some("does_not_exist")), + TableIdentifier("tbl1", Some("unknown_db")), Seq(), ignoreIfNotExists = false) } - intercept[AnalysisException] { + intercept[NoSuchTableException] { catalog.dropPartitions( TableIdentifier("does_not_exist", Some("db2")), Seq(), @@ -566,10 +583,10 @@ class SessionCatalogSuite extends SparkFunSuite { test("get partition when database/table does not exist") { val catalog = new SessionCatalog(newBasicCatalog()) - intercept[AnalysisException] { - catalog.getPartition(TableIdentifier("tbl1", Some("does_not_exist")), part1.spec) + intercept[NoSuchDatabaseException] { + catalog.getPartition(TableIdentifier("tbl1", Some("unknown_db")), part1.spec) } - intercept[AnalysisException] { + intercept[NoSuchTableException] { catalog.getPartition(TableIdentifier("does_not_exist", Some("db2")), part1.spec) } } @@ -606,11 +623,11 @@ class SessionCatalogSuite extends SparkFunSuite { test("rename partitions when database/table does not exist") { val catalog = new SessionCatalog(newBasicCatalog()) - intercept[AnalysisException] { + intercept[NoSuchDatabaseException] { catalog.renamePartitions( - TableIdentifier("tbl1", Some("does_not_exist")), Seq(part1.spec), Seq(part2.spec)) + TableIdentifier("tbl1", Some("unknown_db")), Seq(part1.spec), Seq(part2.spec)) } - intercept[AnalysisException] { + intercept[NoSuchTableException] { catalog.renamePartitions( TableIdentifier("does_not_exist", Some("db2")), Seq(part1.spec), Seq(part2.spec)) } @@ -648,10 +665,10 @@ class SessionCatalogSuite extends SparkFunSuite { test("alter partitions when database/table does not exist") { val catalog = new SessionCatalog(newBasicCatalog()) - intercept[AnalysisException] { - catalog.alterPartitions(TableIdentifier("tbl1", Some("does_not_exist")), Seq(part1)) + intercept[NoSuchDatabaseException] { + catalog.alterPartitions(TableIdentifier("tbl1", Some("unknown_db")), Seq(part1)) } - intercept[AnalysisException] { + intercept[NoSuchTableException] { catalog.alterPartitions(TableIdentifier("does_not_exist", Some("db2")), Seq(part1)) } } @@ -664,6 +681,16 @@ class SessionCatalogSuite extends SparkFunSuite { assert(catalog.listPartitions(TableIdentifier("tbl2")).toSet == Set(part1, part2)) } + test("list partitions when database/table does not exist") { + val catalog = new SessionCatalog(newBasicCatalog()) + intercept[NoSuchDatabaseException] { + catalog.listPartitions(TableIdentifier("tbl1", Some("unknown_db"))) + } + intercept[NoSuchTableException] { + catalog.listPartitions(TableIdentifier("does_not_exist", Some("db2"))) + } + } + // -------------------------------------------------------------------------- // Functions // -------------------------------------------------------------------------- @@ -682,7 +709,7 @@ class SessionCatalogSuite extends SparkFunSuite { test("create function when database does not exist") { val catalog = new SessionCatalog(newBasicCatalog()) - intercept[AnalysisException] { + intercept[NoSuchDatabaseException] { catalog.createFunction( newFunc("func5", Some("does_not_exist")), ignoreIfExists = false) } @@ -690,7 +717,7 @@ class SessionCatalogSuite extends SparkFunSuite { test("create function that already exists") { val catalog = new SessionCatalog(newBasicCatalog()) - intercept[AnalysisException] { + intercept[FunctionAlreadyExistsException] { catalog.createFunction(newFunc("func1", Some("db2")), ignoreIfExists = false) } catalog.createFunction(newFunc("func1", Some("db2")), ignoreIfExists = true) @@ -708,13 +735,13 @@ class SessionCatalogSuite extends SparkFunSuite { assert(catalog.lookupFunction(FunctionIdentifier("temp1"), arguments) === Literal(1)) assert(catalog.lookupFunction(FunctionIdentifier("temp2"), arguments) === Literal(3)) // Temporary function does not exist. - intercept[AnalysisException] { + intercept[NoSuchFunctionException] { catalog.lookupFunction(FunctionIdentifier("temp3"), arguments) } val tempFunc3 = (e: Seq[Expression]) => Literal(e.size) val info3 = new ExpressionInfo("tempFunc3", "temp1") // Temporary function already exists - intercept[AnalysisException] { + intercept[TempFunctionAlreadyExistsException] { catalog.createTempFunction("temp1", info3, tempFunc3, ignoreIfExists = false) } // Temporary function is overridden @@ -740,11 +767,11 @@ class SessionCatalogSuite extends SparkFunSuite { test("drop function when database/function does not exist") { val catalog = new SessionCatalog(newBasicCatalog()) - intercept[AnalysisException] { + intercept[NoSuchDatabaseException] { catalog.dropFunction( - FunctionIdentifier("something", Some("does_not_exist")), ignoreIfNotExists = false) + FunctionIdentifier("something", Some("unknown_db")), ignoreIfNotExists = false) } - intercept[AnalysisException] { + intercept[NoSuchFunctionException] { catalog.dropFunction(FunctionIdentifier("does_not_exist"), ignoreIfNotExists = false) } catalog.dropFunction(FunctionIdentifier("does_not_exist"), ignoreIfNotExists = true) @@ -758,10 +785,10 @@ class SessionCatalogSuite extends SparkFunSuite { val arguments = Seq(Literal(1), Literal(2), Literal(3)) assert(catalog.lookupFunction(FunctionIdentifier("func1"), arguments) === Literal(1)) catalog.dropTempFunction("func1", ignoreIfNotExists = false) - intercept[AnalysisException] { + intercept[NoSuchFunctionException] { catalog.lookupFunction(FunctionIdentifier("func1"), arguments) } - intercept[AnalysisException] { + intercept[NoSuchTempFunctionException] { catalog.dropTempFunction("func1", ignoreIfNotExists = false) } catalog.dropTempFunction("func1", ignoreIfNotExists = true) @@ -780,10 +807,10 @@ class SessionCatalogSuite extends SparkFunSuite { test("get function when database/function does not exist") { val catalog = new SessionCatalog(newBasicCatalog()) - intercept[AnalysisException] { - catalog.getFunctionMetadata(FunctionIdentifier("func1", Some("does_not_exist"))) + intercept[NoSuchDatabaseException] { + catalog.getFunctionMetadata(FunctionIdentifier("func1", Some("unknown_db"))) } - intercept[AnalysisException] { + intercept[NoSuchFunctionException] { catalog.getFunctionMetadata(FunctionIdentifier("does_not_exist", Some("db2"))) } } @@ -796,7 +823,7 @@ class SessionCatalogSuite extends SparkFunSuite { assert(catalog.lookupFunction( FunctionIdentifier("func1"), Seq(Literal(1), Literal(2), Literal(3))) == Literal(1)) catalog.dropTempFunction("func1", ignoreIfNotExists = false) - intercept[AnalysisException] { + intercept[NoSuchFunctionException] { catalog.lookupFunction(FunctionIdentifier("func1"), Seq(Literal(1), Literal(2), Literal(3))) } } @@ -826,4 +853,11 @@ class SessionCatalogSuite extends SparkFunSuite { FunctionIdentifier("func2", Some("db2")))) } + test("list functions when database does not exist") { + val catalog = new SessionCatalog(newBasicCatalog()) + intercept[NoSuchDatabaseException] { + catalog.listFunctions("unknown_db", "func*") + } + } + } http://git-wip-us.apache.org/repos/asf/spark/blob/5c6b0855/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 5fbab23..64b90b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -24,6 +24,7 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.DatabaseAlreadyExistsException import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat} import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog} @@ -212,10 +213,9 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { expectedLocation, Map.empty)) - val message = intercept[AnalysisException] { + intercept[DatabaseAlreadyExistsException] { sql(s"CREATE DATABASE $dbName") - }.getMessage - assert(message.contains(s"Database '$dbNameWithoutBackTicks' already exists.")) + } } finally { catalog.reset() } @@ -280,17 +280,17 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { var message = intercept[AnalysisException] { sql(s"DROP DATABASE $dbName") }.getMessage - assert(message.contains(s"Database '$dbNameWithoutBackTicks' does not exist")) + assert(message.contains(s"Database '$dbNameWithoutBackTicks' not found")) message = intercept[AnalysisException] { sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')") }.getMessage - assert(message.contains(s"Database '$dbNameWithoutBackTicks' does not exist")) + assert(message.contains(s"Database '$dbNameWithoutBackTicks' not found")) message = intercept[AnalysisException] { sql(s"DESCRIBE DATABASE EXTENDED $dbName") }.getMessage - assert(message.contains(s"Database '$dbNameWithoutBackTicks' does not exist")) + assert(message.contains(s"Database '$dbNameWithoutBackTicks' not found")) sql(s"DROP DATABASE IF EXISTS $dbName") } @@ -1014,7 +1014,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { sql("DROP DATABASE DeFault") }.getMessage if (caseSensitive == "true") { - assert(message.contains("Database 'DeFault' does not exist")) + assert(message.contains("Database 'DeFault' not found")) } else { assert(message.contains("Can not drop default database")) } http://git-wip-us.apache.org/repos/asf/spark/blob/5c6b0855/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala index b8fef23..1eed5b6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.analysis.NoSuchTableException -import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton} +import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleton { @@ -122,10 +122,10 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto } test("show tblproperties for datasource table - errors") { - val message1 = intercept[AnalysisException] { + val message1 = intercept[NoSuchTableException] { sql("SHOW TBLPROPERTIES badtable") }.getMessage - assert(message1.contains("'badtable' not found in database 'default'")) + assert(message1.contains("Table or view 'badtable' not found in database 'default'")) // When key is not found, a row containing the error is returned. checkAnswer( http://git-wip-us.apache.org/repos/asf/spark/blob/5c6b0855/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 2e4077d..6ce5051 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -191,20 +191,22 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { allBuiltinFunctions.foreach { f => assert(allFunctions.contains(f)) } - checkAnswer(sql("SHOW functions abs"), Row("abs")) - checkAnswer(sql("SHOW functions 'abs'"), Row("abs")) - checkAnswer(sql("SHOW functions abc.abs"), Row("abs")) - checkAnswer(sql("SHOW functions `abc`.`abs`"), Row("abs")) - checkAnswer(sql("SHOW functions `abc`.`abs`"), Row("abs")) - checkAnswer(sql("SHOW functions `~`"), Row("~")) - checkAnswer(sql("SHOW functions `a function doens't exist`"), Nil) - checkAnswer(sql("SHOW functions `weekofyea*`"), Row("weekofyear")) - // this probably will failed if we add more function with `sha` prefixing. - checkAnswer(sql("SHOW functions `sha*`"), Row("sha") :: Row("sha1") :: Row("sha2") :: Nil) - // Test '|' for alternation. - checkAnswer( - sql("SHOW functions 'sha*|weekofyea*'"), - Row("sha") :: Row("sha1") :: Row("sha2") :: Row("weekofyear") :: Nil) + withTempDatabase { db => + checkAnswer(sql("SHOW functions abs"), Row("abs")) + checkAnswer(sql("SHOW functions 'abs'"), Row("abs")) + checkAnswer(sql(s"SHOW functions $db.abs"), Row("abs")) + checkAnswer(sql(s"SHOW functions `$db`.`abs`"), Row("abs")) + checkAnswer(sql(s"SHOW functions `$db`.`abs`"), Row("abs")) + checkAnswer(sql("SHOW functions `~`"), Row("~")) + checkAnswer(sql("SHOW functions `a function doens't exist`"), Nil) + checkAnswer(sql("SHOW functions `weekofyea*`"), Row("weekofyear")) + // this probably will failed if we add more function with `sha` prefixing. + checkAnswer(sql("SHOW functions `sha*`"), Row("sha") :: Row("sha1") :: Row("sha2") :: Nil) + // Test '|' for alternation. + checkAnswer( + sql("SHOW functions 'sha*|weekofyea*'"), + Row("sha") :: Row("sha1") :: Row("sha2") :: Row("weekofyear") :: Nil) + } } test("describe functions - built-in functions") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org