Repository: spark Updated Branches: refs/heads/branch-2.1 9c5495728 -> 1e8fbefa3
[SPARK-18594][SQL] Name Validation of Databases/Tables ### What changes were proposed in this pull request? Currently, the name validation checks are limited to table creation. It is enfored by Analyzer rule: `PreWriteCheck`. However, table renaming and database creation have the same issues. It makes more sense to do the checks in `SessionCatalog`. This PR is to add it into `SessionCatalog`. ### How was this patch tested? Added test cases Author: gatorsmile <gatorsm...@gmail.com> Closes #16018 from gatorsmile/nameValidate. (cherry picked from commit 07f32c2283e26e86474ba8c9b50125831009a1ea) Signed-off-by: gatorsmile <gatorsm...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1e8fbefa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1e8fbefa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1e8fbefa Branch: refs/heads/branch-2.1 Commit: 1e8fbefa3b61e2deb3dc7d7d3467e4cec69e54ce Parents: 9c54957 Author: gatorsmile <gatorsm...@gmail.com> Authored: Sun Nov 27 19:43:24 2016 -0800 Committer: gatorsmile <gatorsm...@gmail.com> Committed: Sun Nov 27 19:47:17 2016 -0800 ---------------------------------------------------------------------- .../sql/catalyst/catalog/SessionCatalog.scala | 18 +++++++++++++ .../catalyst/catalog/SessionCatalogSuite.scala | 27 +++++++++++++++++++ .../spark/sql/execution/datasources/rules.scala | 28 +++++--------------- .../spark/sql/hive/MultiDatabaseSuite.scala | 11 ++++---- 4 files changed, 57 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1e8fbefa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 19a8fcd..002aecb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -86,6 +86,21 @@ class SessionCatalog( protected var currentDb = formatDatabaseName(DEFAULT_DATABASE) /** + * Checks if the given name conforms the Hive standard ("[a-zA-z_0-9]+"), + * i.e. if this name only contains characters, numbers, and _. + * + * This method is intended to have the same behavior of + * org.apache.hadoop.hive.metastore.MetaStoreUtils.validateName. + */ + private def validateName(name: String): Unit = { + val validNameFormat = "([\\w_]+)".r + if (!validNameFormat.pattern.matcher(name).matches()) { + throw new AnalysisException(s"`$name` is not a valid name for tables/databases. " + + "Valid names only contain alphabet characters, numbers and _.") + } + } + + /** * Format table name, taking into account case sensitivity. */ protected[this] def formatTableName(name: String): String = { @@ -143,6 +158,7 @@ class SessionCatalog( s"${globalTempViewManager.database} is a system preserved database, " + "you cannot create a database with this name.") } + validateName(dbName) val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString externalCatalog.createDatabase( dbDefinition.copy(name = dbName, locationUri = qualifiedPath), @@ -226,6 +242,7 @@ class SessionCatalog( def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = { val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase)) val table = formatTableName(tableDefinition.identifier.table) + validateName(table) val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db))) requireDbExists(db) externalCatalog.createTable(newTableDefinition, ignoreIfExists) @@ -474,6 +491,7 @@ class SessionCatalog( if (oldName.database.isDefined || !tempTables.contains(oldTableName)) { requireTableExists(TableIdentifier(oldTableName, Some(db))) requireTableNotExists(TableIdentifier(newTableName, Some(db))) + validateName(newTableName) externalCatalog.renameTable(db, oldTableName, newTableName) } else { if (newName.database.isDefined) { http://git-wip-us.apache.org/repos/asf/spark/blob/1e8fbefa/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 52385de..da41d36 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -61,6 +61,22 @@ class SessionCatalogSuite extends SparkFunSuite { assert(!catalog.databaseExists("does_not_exist")) } + def testInvalidName(func: (String) => Unit) { + // scalastyle:off + // non ascii characters are not allowed in the source code, so we disable the scalastyle. + val name = "ç " + // scalastyle:on + val e = intercept[AnalysisException] { + func(name) + }.getMessage + assert(e.contains(s"`$name` is not a valid name for tables/databases.")) + } + + test("create databases using invalid names") { + val catalog = new SessionCatalog(newEmptyCatalog()) + testInvalidName(name => catalog.createDatabase(newDb(name), ignoreIfExists = true)) + } + test("get database when a database exists") { val catalog = new SessionCatalog(newBasicCatalog()) val db1 = catalog.getDatabaseMetadata("db1") @@ -194,6 +210,11 @@ class SessionCatalogSuite extends SparkFunSuite { assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2", "tbl3")) } + test("create tables using invalid names") { + val catalog = new SessionCatalog(newEmptyCatalog()) + testInvalidName(name => catalog.createTable(newTable(name, "db1"), ignoreIfExists = false)) + } + test("create table when database does not exist") { val catalog = new SessionCatalog(newBasicCatalog()) // Creating table in non-existent database should always fail @@ -309,6 +330,12 @@ class SessionCatalogSuite extends SparkFunSuite { } } + test("rename tables to an invalid name") { + val catalog = new SessionCatalog(newBasicCatalog()) + testInvalidName( + name => catalog.renameTable(TableIdentifier("tbl1", Some("db2")), TableIdentifier(name))) + } + test("rename table when database/table does not exist") { val catalog = new SessionCatalog(newBasicCatalog()) intercept[NoSuchDatabaseException] { http://git-wip-us.apache.org/repos/asf/spark/blob/1e8fbefa/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 5ba44ff..7154e3e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -309,24 +309,9 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) } - // This regex is used to check if the table name and database name is valid for `CreateTable`. - private val validNameFormat = Pattern.compile("[\\w_]+") - def apply(plan: LogicalPlan): Unit = { plan.foreach { case c @ CreateTable(tableDesc, mode, query) if c.resolved => - // Since we are saving table metadata to metastore, we should make sure the table name - // and database name don't break some common restrictions, e.g. special chars except - // underscore are not allowed. - val tblIdent = tableDesc.identifier - if (!validNameFormat.matcher(tblIdent.table).matches()) { - failAnalysis(s"Table name ${tblIdent.table} is not a valid name for " + - s"metastore. Metastore only accepts table name containing characters, numbers and _.") - } - if (tblIdent.database.exists(db => !validNameFormat.matcher(db).matches())) { - failAnalysis(s"Database name ${tblIdent.database.get} is not a valid name for " + - s"metastore. Metastore only accepts table name containing characters, numbers and _.") - } if (query.isDefined && mode == SaveMode.Overwrite && catalog.tableExists(tableDesc.identifier)) { @@ -334,7 +319,7 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) EliminateSubqueryAliases(catalog.lookupRelation(tableDesc.identifier)) match { // Only do the check if the table is a data source table // (the relation is a BaseRelation). - case l @ LogicalRelation(dest: BaseRelation, _, _) => + case LogicalRelation(dest: BaseRelation, _, _) => // Get all input data source relations of the query. val srcRelations = query.get.collect { case LogicalRelation(src: BaseRelation, _, _) => src @@ -347,9 +332,8 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) } } - case i @ logical.InsertIntoTable( - l @ LogicalRelation(t: InsertableRelation, _, _), - partition, query, overwrite, ifNotExists) => + case logical.InsertIntoTable( + l @ LogicalRelation(t: InsertableRelation, _, _), partition, query, _, _) => // Right now, we do not support insert into a data source table with partition specs. if (partition.nonEmpty) { failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.") @@ -367,15 +351,15 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) } case logical.InsertIntoTable( - LogicalRelation(r: HadoopFsRelation, _, _), part, query, overwrite, _) => + LogicalRelation(r: HadoopFsRelation, _, _), part, query, _, _) => // We need to make sure the partition columns specified by users do match partition // columns of the relation. val existingPartitionColumns = r.partitionSchema.fieldNames.toSet val specifiedPartitionColumns = part.keySet if (existingPartitionColumns != specifiedPartitionColumns) { - failAnalysis(s"Specified partition columns " + + failAnalysis("Specified partition columns " + s"(${specifiedPartitionColumns.mkString(", ")}) " + - s"do not match the partition columns of the table. Please use " + + "do not match the partition columns of the table. Please use " + s"(${existingPartitionColumns.mkString(", ")}) as the partition columns.") } else { // OK http://git-wip-us.apache.org/repos/asf/spark/blob/1e8fbefa/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala index 9f4401a..7322465 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala @@ -269,17 +269,17 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle val message = intercept[AnalysisException] { df.write.format("parquet").saveAsTable("`d:b`.`t:a`") }.getMessage - assert(message.contains("is not a valid name for metastore")) + assert(message.contains("Database 'd:b' not found")) } { val message = intercept[AnalysisException] { df.write.format("parquet").saveAsTable("`d:b`.`table`") }.getMessage - assert(message.contains("is not a valid name for metastore")) + assert(message.contains("Database 'd:b' not found")) } - withTempPath { dir => + withTempDir { dir => val path = dir.getCanonicalPath { @@ -293,7 +293,8 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle |) """.stripMargin) }.getMessage - assert(message.contains("is not a valid name for metastore")) + assert(message.contains("`t:a` is not a valid name for tables/databases. " + + "Valid names only contain alphabet characters, numbers and _.")) } { @@ -307,7 +308,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle |) """.stripMargin) }.getMessage - assert(message.contains("is not a valid name for metastore")) + assert(message.contains("Database 'd:b' not found")) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org