Repository: spark Updated Branches: refs/heads/master c95fbea68 -> 26e7bca22
[SPARK-20198][SQL] Remove the inconsistency in table/function name conventions in SparkSession.Catalog APIs ### What changes were proposed in this pull request? Observed by felixcheung , in `SparkSession`.`Catalog` APIs, we have different conventions/rules for table/function identifiers/names. Most APIs accept the qualified name (i.e., `databaseName`.`tableName` or `databaseName`.`functionName`). However, the following five APIs do not accept it. - def listColumns(tableName: String): Dataset[Column] - def getTable(tableName: String): Table - def getFunction(functionName: String): Function - def tableExists(tableName: String): Boolean - def functionExists(functionName: String): Boolean To make them consistent with the other Catalog APIs, this PR does the changes, updates the function/API comments and adds the `params` to clarify the inputs we allow. ### How was this patch tested? Added the test cases . Author: Xiao Li <gatorsm...@gmail.com> Closes #17518 from gatorsmile/tableIdentifier. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26e7bca2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26e7bca2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26e7bca2 Branch: refs/heads/master Commit: 26e7bca2295faeef22b2d9554f316c97bc240fd7 Parents: c95fbea Author: Xiao Li <gatorsm...@gmail.com> Authored: Tue Apr 4 18:57:46 2017 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Tue Apr 4 18:57:46 2017 +0800 ---------------------------------------------------------------------- .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 8 ++ .../spark/sql/catalyst/parser/AstBuilder.scala | 13 +++ .../spark/sql/catalyst/parser/ParseDriver.scala | 7 +- .../sql/catalyst/parser/ParserInterface.scala | 5 +- .../org/apache/spark/sql/SparkSession.scala | 7 +- .../org/apache/spark/sql/catalog/Catalog.scala | 109 +++++++++++++++---- .../apache/spark/sql/internal/CatalogImpl.scala | 73 +++++++------ .../spark/sql/internal/CatalogSuite.scala | 21 ++++ 8 files changed, 186 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/26e7bca2/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index c4a590e..52b5b34 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -56,6 +56,10 @@ singleTableIdentifier : tableIdentifier EOF ; +singleFunctionIdentifier + : functionIdentifier EOF + ; + singleDataType : dataType EOF ; @@ -493,6 +497,10 @@ tableIdentifier : (db=identifier '.')? table=identifier ; +functionIdentifier + : (db=identifier '.')? function=identifier + ; + namedExpression : expression (AS? (identifier | identifierList))? ; http://git-wip-us.apache.org/repos/asf/spark/blob/26e7bca2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 162051a..fab7e4c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -75,6 +75,11 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { visitTableIdentifier(ctx.tableIdentifier) } + override def visitSingleFunctionIdentifier( + ctx: SingleFunctionIdentifierContext): FunctionIdentifier = withOrigin(ctx) { + visitFunctionIdentifier(ctx.functionIdentifier) + } + override def visitSingleDataType(ctx: SingleDataTypeContext): DataType = withOrigin(ctx) { visitSparkDataType(ctx.dataType) } @@ -759,6 +764,14 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { TableIdentifier(ctx.table.getText, Option(ctx.db).map(_.getText)) } + /** + * Create a [[FunctionIdentifier]] from a 'functionName' or 'databaseName'.'functionName' pattern. + */ + override def visitFunctionIdentifier( + ctx: FunctionIdentifierContext): FunctionIdentifier = withOrigin(ctx) { + FunctionIdentifier(ctx.function.getText, Option(ctx.db).map(_.getText)) + } + /* ******************************************************************************************** * Expression parsing * ******************************************************************************************** */ http://git-wip-us.apache.org/repos/asf/spark/blob/26e7bca2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala index f704b09..80ab75c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParseDriver.scala @@ -22,7 +22,7 @@ import org.antlr.v4.runtime.misc.ParseCancellationException import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.trees.Origin @@ -49,6 +49,11 @@ abstract class AbstractSqlParser extends ParserInterface with Logging { astBuilder.visitSingleTableIdentifier(parser.singleTableIdentifier()) } + /** Creates FunctionIdentifier for a given SQL string. */ + def parseFunctionIdentifier(sqlText: String): FunctionIdentifier = parse(sqlText) { parser => + astBuilder.visitSingleFunctionIdentifier(parser.singleFunctionIdentifier()) + } + /** * Creates StructType for a given SQL string, which is a comma separated list of field * definitions which will preserve the correct Hive metadata. http://git-wip-us.apache.org/repos/asf/spark/blob/26e7bca2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserInterface.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserInterface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserInterface.scala index 6edbe25..db3598b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserInterface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/ParserInterface.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.parser -import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.types.StructType @@ -35,6 +35,9 @@ trait ParserInterface { /** Creates TableIdentifier for a given SQL string. */ def parseTableIdentifier(sqlText: String): TableIdentifier + /** Creates FunctionIdentifier for a given SQL string. */ + def parseFunctionIdentifier(sqlText: String): FunctionIdentifier + /** * Creates StructType for a given SQL string, which is a comma separated list of field * definitions which will preserve the correct Hive metadata. http://git-wip-us.apache.org/repos/asf/spark/blob/26e7bca2/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index b604992..95f3463 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -591,8 +591,13 @@ class SparkSession private( @transient lazy val catalog: Catalog = new CatalogImpl(self) /** - * Returns the specified table as a `DataFrame`. + * Returns the specified table/view as a `DataFrame`. * + * @param tableName is either a qualified or unqualified name that designates a table or view. + * If a database is specified, it identifies the table/view from the database. + * Otherwise, it first attempts to find a temporary view with the given name + * and then match the table/view from the current database. + * Note that, the global temporary view database is also valid here. * @since 2.0.0 */ def table(tableName: String): DataFrame = { http://git-wip-us.apache.org/repos/asf/spark/blob/26e7bca2/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index 50252db..137b0cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -54,16 +54,16 @@ abstract class Catalog { def listDatabases(): Dataset[Database] /** - * Returns a list of tables in the current database. - * This includes all temporary tables. + * Returns a list of tables/views in the current database. + * This includes all temporary views. * * @since 2.0.0 */ def listTables(): Dataset[Table] /** - * Returns a list of tables in the specified database. - * This includes all temporary tables. + * Returns a list of tables/views in the specified database. + * This includes all temporary views. * * @since 2.0.0 */ @@ -88,17 +88,21 @@ abstract class Catalog { def listFunctions(dbName: String): Dataset[Function] /** - * Returns a list of columns for the given table in the current database or - * the given temporary table. + * Returns a list of columns for the given table/view or temporary view. * + * @param tableName is either a qualified or unqualified name that designates a table/view. + * If no database identifier is provided, it refers to a temporary view or + * a table/view in the current database. * @since 2.0.0 */ @throws[AnalysisException]("table does not exist") def listColumns(tableName: String): Dataset[Column] /** - * Returns a list of columns for the given table in the specified database. + * Returns a list of columns for the given table/view in the specified database. * + * @param dbName is a name that designates a database. + * @param tableName is an unqualified name that designates a table/view. * @since 2.0.0 */ @throws[AnalysisException]("database or table does not exist") @@ -115,9 +119,11 @@ abstract class Catalog { /** * Get the table or view with the specified name. This table can be a temporary view or a - * table/view in the current database. This throws an AnalysisException when no Table - * can be found. + * table/view. This throws an AnalysisException when no Table can be found. * + * @param tableName is either a qualified or unqualified name that designates a table/view. + * If no database identifier is provided, it refers to a table/view in + * the current database. * @since 2.1.0 */ @throws[AnalysisException]("table does not exist") @@ -134,9 +140,11 @@ abstract class Catalog { /** * Get the function with the specified name. This function can be a temporary function or a - * function in the current database. This throws an AnalysisException when the function cannot - * be found. + * function. This throws an AnalysisException when the function cannot be found. * + * @param functionName is either a qualified or unqualified name that designates a function. + * If no database identifier is provided, it refers to a temporary function + * or a function in the current database. * @since 2.1.0 */ @throws[AnalysisException]("function does not exist") @@ -146,6 +154,8 @@ abstract class Catalog { * Get the function with the specified name. This throws an AnalysisException when the function * cannot be found. * + * @param dbName is a name that designates a database. + * @param functionName is an unqualified name that designates a function in the specified database * @since 2.1.0 */ @throws[AnalysisException]("database or function does not exist") @@ -160,8 +170,11 @@ abstract class Catalog { /** * Check if the table or view with the specified name exists. This can either be a temporary - * view or a table/view in the current database. + * view or a table/view. * + * @param tableName is either a qualified or unqualified name that designates a table/view. + * If no database identifier is provided, it refers to a table/view in + * the current database. * @since 2.1.0 */ def tableExists(tableName: String): Boolean @@ -169,14 +182,19 @@ abstract class Catalog { /** * Check if the table or view with the specified name exists in the specified database. * + * @param dbName is a name that designates a database. + * @param tableName is an unqualified name that designates a table. * @since 2.1.0 */ def tableExists(dbName: String, tableName: String): Boolean /** * Check if the function with the specified name exists. This can either be a temporary function - * or a function in the current database. + * or a function. * + * @param functionName is either a qualified or unqualified name that designates a function. + * If no database identifier is provided, it refers to a function in + * the current database. * @since 2.1.0 */ def functionExists(functionName: String): Boolean @@ -184,6 +202,8 @@ abstract class Catalog { /** * Check if the function with the specified name exists in the specified database. * + * @param dbName is a name that designates a database. + * @param functionName is an unqualified name that designates a function. * @since 2.1.0 */ def functionExists(dbName: String, functionName: String): Boolean @@ -192,6 +212,9 @@ abstract class Catalog { * Creates a table from the given path and returns the corresponding DataFrame. * It will use the default data source configured by spark.sql.sources.default. * + * @param tableName is either a qualified or unqualified name that designates a table. + * If no database identifier is provided, it refers to a table in + * the current database. * @since 2.0.0 */ @deprecated("use createTable instead.", "2.2.0") @@ -204,6 +227,9 @@ abstract class Catalog { * Creates a table from the given path and returns the corresponding DataFrame. * It will use the default data source configured by spark.sql.sources.default. * + * @param tableName is either a qualified or unqualified name that designates a table. + * If no database identifier is provided, it refers to a table in + * the current database. * @since 2.2.0 */ @Experimental @@ -214,6 +240,9 @@ abstract class Catalog { * Creates a table from the given path based on a data source and returns the corresponding * DataFrame. * + * @param tableName is either a qualified or unqualified name that designates a table. + * If no database identifier is provided, it refers to a table in + * the current database. * @since 2.0.0 */ @deprecated("use createTable instead.", "2.2.0") @@ -226,6 +255,9 @@ abstract class Catalog { * Creates a table from the given path based on a data source and returns the corresponding * DataFrame. * + * @param tableName is either a qualified or unqualified name that designates a table. + * If no database identifier is provided, it refers to a table in + * the current database. * @since 2.2.0 */ @Experimental @@ -236,6 +268,9 @@ abstract class Catalog { * Creates a table from the given path based on a data source and a set of options. * Then, returns the corresponding DataFrame. * + * @param tableName is either a qualified or unqualified name that designates a table. + * If no database identifier is provided, it refers to a table in + * the current database. * @since 2.0.0 */ @deprecated("use createTable instead.", "2.2.0") @@ -251,6 +286,9 @@ abstract class Catalog { * Creates a table from the given path based on a data source and a set of options. * Then, returns the corresponding DataFrame. * + * @param tableName is either a qualified or unqualified name that designates a table. + * If no database identifier is provided, it refers to a table in + * the current database. * @since 2.2.0 */ @Experimental @@ -267,6 +305,9 @@ abstract class Catalog { * Creates a table from the given path based on a data source and a set of options. * Then, returns the corresponding DataFrame. * + * @param tableName is either a qualified or unqualified name that designates a table. + * If no database identifier is provided, it refers to a table in + * the current database. * @since 2.0.0 */ @deprecated("use createTable instead.", "2.2.0") @@ -283,6 +324,9 @@ abstract class Catalog { * Creates a table from the given path based on a data source and a set of options. * Then, returns the corresponding DataFrame. * + * @param tableName is either a qualified or unqualified name that designates a table. + * If no database identifier is provided, it refers to a table in + * the current database. * @since 2.2.0 */ @Experimental @@ -297,6 +341,9 @@ abstract class Catalog { * Create a table from the given path based on a data source, a schema and a set of options. * Then, returns the corresponding DataFrame. * + * @param tableName is either a qualified or unqualified name that designates a table. + * If no database identifier is provided, it refers to a table in + * the current database. * @since 2.0.0 */ @deprecated("use createTable instead.", "2.2.0") @@ -313,6 +360,9 @@ abstract class Catalog { * Create a table from the given path based on a data source, a schema and a set of options. * Then, returns the corresponding DataFrame. * + * @param tableName is either a qualified or unqualified name that designates a table. + * If no database identifier is provided, it refers to a table in + * the current database. * @since 2.2.0 */ @Experimental @@ -330,6 +380,9 @@ abstract class Catalog { * Create a table from the given path based on a data source, a schema and a set of options. * Then, returns the corresponding DataFrame. * + * @param tableName is either a qualified or unqualified name that designates a table. + * If no database identifier is provided, it refers to a table in + * the current database. * @since 2.0.0 */ @deprecated("use createTable instead.", "2.2.0") @@ -347,6 +400,9 @@ abstract class Catalog { * Create a table from the given path based on a data source, a schema and a set of options. * Then, returns the corresponding DataFrame. * + * @param tableName is either a qualified or unqualified name that designates a table. + * If no database identifier is provided, it refers to a table in + * the current database. * @since 2.2.0 */ @Experimental @@ -368,7 +424,7 @@ abstract class Catalog { * Note that, the return type of this method was Unit in Spark 2.0, but changed to Boolean * in Spark 2.1. * - * @param viewName the name of the view to be dropped. + * @param viewName the name of the temporary view to be dropped. * @return true if the view is dropped successfully, false otherwise. * @since 2.0.0 */ @@ -383,15 +439,18 @@ abstract class Catalog { * preserved database `global_temp`, and we must use the qualified name to refer a global temp * view, e.g. `SELECT * FROM global_temp.view1`. * - * @param viewName the name of the view to be dropped. + * @param viewName the unqualified name of the temporary view to be dropped. * @return true if the view is dropped successfully, false otherwise. * @since 2.1.0 */ def dropGlobalTempView(viewName: String): Boolean /** - * Recover all the partitions in the directory of a table and update the catalog. + * Recovers all the partitions in the directory of a table and update the catalog. * + * @param tableName is either a qualified or unqualified name that designates a table. + * If no database identifier is provided, it refers to a table in the + * current database. * @since 2.1.1 */ def recoverPartitions(tableName: String): Unit @@ -399,6 +458,9 @@ abstract class Catalog { /** * Returns true if the table is currently cached in-memory. * + * @param tableName is either a qualified or unqualified name that designates a table/view. + * If no database identifier is provided, it refers to a temporary view or + * a table/view in the current database. * @since 2.0.0 */ def isCached(tableName: String): Boolean @@ -406,6 +468,9 @@ abstract class Catalog { /** * Caches the specified table in-memory. * + * @param tableName is either a qualified or unqualified name that designates a table/view. + * If no database identifier is provided, it refers to a temporary view or + * a table/view in the current database. * @since 2.0.0 */ def cacheTable(tableName: String): Unit @@ -413,6 +478,9 @@ abstract class Catalog { /** * Removes the specified table from the in-memory cache. * + * @param tableName is either a qualified or unqualified name that designates a table/view. + * If no database identifier is provided, it refers to a temporary view or + * a table/view in the current database. * @since 2.0.0 */ def uncacheTable(tableName: String): Unit @@ -425,7 +493,7 @@ abstract class Catalog { def clearCache(): Unit /** - * Invalidate and refresh all the cached metadata of the given table. For performance reasons, + * Invalidates and refreshes all the cached metadata of the given table. For performance reasons, * Spark SQL or the external data source library it uses might cache certain metadata about a * table, such as the location of blocks. When those change outside of Spark SQL, users should * call this function to invalidate the cache. @@ -433,13 +501,16 @@ abstract class Catalog { * If this table is cached as an InMemoryRelation, drop the original cached version and make the * new version cached lazily. * + * @param tableName is either a qualified or unqualified name that designates a table/view. + * If no database identifier is provided, it refers to a temporary view or + * a table/view in the current database. * @since 2.0.0 */ def refreshTable(tableName: String): Unit /** - * Invalidate and refresh all the cached data (and the associated metadata) for any dataframe that - * contains the given data source path. Path matching is by prefix, i.e. "/" would invalidate + * Invalidates and refreshes all the cached data (and the associated metadata) for any [[Dataset]] + * that contains the given data source path. Path matching is by prefix, i.e. "/" would invalidate * everything that is cached. * * @since 2.0.0 http://git-wip-us.apache.org/repos/asf/spark/blob/26e7bca2/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 5337485..5d1c35a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.internal import scala.reflect.runtime.universe.TypeTag -import org.apache.hadoop.fs.Path - import org.apache.spark.annotation.Experimental import org.apache.spark.sql._ import org.apache.spark.sql.catalog.{Catalog, Column, Database, Function, Table} @@ -143,11 +141,12 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { } /** - * Returns a list of columns for the given table in the current database. + * Returns a list of columns for the given table temporary view. */ @throws[AnalysisException]("table does not exist") override def listColumns(tableName: String): Dataset[Column] = { - listColumns(TableIdentifier(tableName, None)) + val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) + listColumns(tableIdent) } /** @@ -177,7 +176,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { } /** - * Get the database with the specified name. This throws an `AnalysisException` when no + * Gets the database with the specified name. This throws an `AnalysisException` when no * `Database` can be found. */ override def getDatabase(dbName: String): Database = { @@ -185,16 +184,16 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { } /** - * Get the table or view with the specified name. This table can be a temporary view or a - * table/view in the current database. This throws an `AnalysisException` when no `Table` - * can be found. + * Gets the table or view with the specified name. This table can be a temporary view or a + * table/view. This throws an `AnalysisException` when no `Table` can be found. */ override def getTable(tableName: String): Table = { - getTable(null, tableName) + val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) + getTable(tableIdent.database.orNull, tableIdent.table) } /** - * Get the table or view with the specified name in the specified database. This throws an + * Gets the table or view with the specified name in the specified database. This throws an * `AnalysisException` when no `Table` can be found. */ override def getTable(dbName: String, tableName: String): Table = { @@ -202,16 +201,16 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { } /** - * Get the function with the specified name. This function can be a temporary function or a - * function in the current database. This throws an `AnalysisException` when no `Function` - * can be found. + * Gets the function with the specified name. This function can be a temporary function or a + * function. This throws an `AnalysisException` when no `Function` can be found. */ override def getFunction(functionName: String): Function = { - getFunction(null, functionName) + val functionIdent = sparkSession.sessionState.sqlParser.parseFunctionIdentifier(functionName) + getFunction(functionIdent.database.orNull, functionIdent.funcName) } /** - * Get the function with the specified name. This returns `None` when no `Function` can be + * Gets the function with the specified name. This returns `None` when no `Function` can be * found. */ override def getFunction(dbName: String, functionName: String): Function = { @@ -219,22 +218,23 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { } /** - * Check if the database with the specified name exists. + * Checks if the database with the specified name exists. */ override def databaseExists(dbName: String): Boolean = { sessionCatalog.databaseExists(dbName) } /** - * Check if the table or view with the specified name exists. This can either be a temporary - * view or a table/view in the current database. + * Checks if the table or view with the specified name exists. This can either be a temporary + * view or a table/view. */ override def tableExists(tableName: String): Boolean = { - tableExists(null, tableName) + val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) + tableExists(tableIdent.database.orNull, tableIdent.table) } /** - * Check if the table or view with the specified name exists in the specified database. + * Checks if the table or view with the specified name exists in the specified database. */ override def tableExists(dbName: String, tableName: String): Boolean = { val tableIdent = TableIdentifier(tableName, Option(dbName)) @@ -242,15 +242,16 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { } /** - * Check if the function with the specified name exists. This can either be a temporary function - * or a function in the current database. + * Checks if the function with the specified name exists. This can either be a temporary function + * or a function. */ override def functionExists(functionName: String): Boolean = { - functionExists(null, functionName) + val functionIdent = sparkSession.sessionState.sqlParser.parseFunctionIdentifier(functionName) + functionExists(functionIdent.database.orNull, functionIdent.funcName) } /** - * Check if the function with the specified name exists in the specified database. + * Checks if the function with the specified name exists in the specified database. */ override def functionExists(dbName: String, functionName: String): Boolean = { sessionCatalog.functionExists(FunctionIdentifier(functionName, Option(dbName))) @@ -303,7 +304,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { /** * :: Experimental :: * (Scala-specific) - * Create a table from the given path based on a data source, a schema and a set of options. + * Creates a table from the given path based on a data source, a schema and a set of options. * Then, returns the corresponding DataFrame. * * @group ddl_ops @@ -338,7 +339,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { * Drops the local temporary view with the given view name in the catalog. * If the view has been cached/persisted before, it's also unpersisted. * - * @param viewName the name of the view to be dropped. + * @param viewName the identifier of the temporary view to be dropped. * @group ddl_ops * @since 2.0.0 */ @@ -353,7 +354,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { * Drops the global temporary view with the given view name in the catalog. * If the view has been cached/persisted before, it's also unpersisted. * - * @param viewName the name of the view to be dropped. + * @param viewName the identifier of the global temporary view to be dropped. * @group ddl_ops * @since 2.1.0 */ @@ -365,9 +366,11 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { } /** - * Recover all the partitions in the directory of a table and update the catalog. + * Recovers all the partitions in the directory of a table and update the catalog. * - * @param tableName the name of the table to be repaired. + * @param tableName is either a qualified or unqualified name that designates a table. + * If no database identifier is provided, it refers to a table in the + * current database. * @group ddl_ops * @since 2.1.1 */ @@ -378,7 +381,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { } /** - * Returns true if the table is currently cached in-memory. + * Returns true if the table or view is currently cached in-memory. * * @group cachemgmt * @since 2.0.0 @@ -388,7 +391,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { } /** - * Caches the specified table in-memory. + * Caches the specified table or view in-memory. * * @group cachemgmt * @since 2.0.0 @@ -398,7 +401,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { } /** - * Removes the specified table from the in-memory cache. + * Removes the specified table or view from the in-memory cache. * * @group cachemgmt * @since 2.0.0 @@ -408,7 +411,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { } /** - * Removes all cached tables from the in-memory cache. + * Removes all cached tables or views from the in-memory cache. * * @group cachemgmt * @since 2.0.0 @@ -428,7 +431,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { } /** - * Refresh the cache entry for a table, if any. For Hive metastore table, the metadata + * Refreshes the cache entry for a table or view, if any. For Hive metastore table, the metadata * is refreshed. For data source tables, the schema will not be inferred and refreshed. * * @group cachemgmt @@ -452,7 +455,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { } /** - * Refresh the cache entry and the associated metadata for all dataframes (if any), that contain + * Refreshes the cache entry and the associated metadata for all Dataset (if any), that contain * the given data source path. * * @group cachemgmt http://git-wip-us.apache.org/repos/asf/spark/blob/26e7bca2/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index 9742b3b..6469e50 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -102,6 +102,11 @@ class CatalogSuite assert(col.isPartition == tableMetadata.partitionColumnNames.contains(col.name)) assert(col.isBucket == bucketColumnNames.contains(col.name)) } + + dbName.foreach { db => + val expected = columns.collect().map(_.name).toSet + assert(spark.catalog.listColumns(s"$db.$tableName").collect().map(_.name).toSet == expected) + } } override def afterEach(): Unit = { @@ -345,6 +350,7 @@ class CatalogSuite // Find a qualified table assert(spark.catalog.getTable(db, "tbl_y").name === "tbl_y") + assert(spark.catalog.getTable(s"$db.tbl_y").name === "tbl_y") // Find an unqualified table using the current database intercept[AnalysisException](spark.catalog.getTable("tbl_y")) @@ -378,6 +384,11 @@ class CatalogSuite assert(fn2.database === db) assert(!fn2.isTemporary) + val fn2WithQualifiedName = spark.catalog.getFunction(s"$db.fn2") + assert(fn2WithQualifiedName.name === "fn2") + assert(fn2WithQualifiedName.database === db) + assert(!fn2WithQualifiedName.isTemporary) + // Find an unqualified function using the current database intercept[AnalysisException](spark.catalog.getFunction("fn2")) spark.catalog.setCurrentDatabase(db) @@ -403,6 +414,7 @@ class CatalogSuite assert(!spark.catalog.tableExists("tbl_x")) assert(!spark.catalog.tableExists("tbl_y")) assert(!spark.catalog.tableExists(db, "tbl_y")) + assert(!spark.catalog.tableExists(s"$db.tbl_y")) // Create objects. createTempTable("tbl_x") @@ -413,11 +425,15 @@ class CatalogSuite // Find a qualified table assert(spark.catalog.tableExists(db, "tbl_y")) + assert(spark.catalog.tableExists(s"$db.tbl_y")) // Find an unqualified table using the current database assert(!spark.catalog.tableExists("tbl_y")) spark.catalog.setCurrentDatabase(db) assert(spark.catalog.tableExists("tbl_y")) + + // Unable to find the table, although the temp view with the given name exists + assert(!spark.catalog.tableExists(db, "tbl_x")) } } } @@ -429,6 +445,7 @@ class CatalogSuite assert(!spark.catalog.functionExists("fn1")) assert(!spark.catalog.functionExists("fn2")) assert(!spark.catalog.functionExists(db, "fn2")) + assert(!spark.catalog.functionExists(s"$db.fn2")) // Create objects. createTempFunction("fn1") @@ -439,11 +456,15 @@ class CatalogSuite // Find a qualified function assert(spark.catalog.functionExists(db, "fn2")) + assert(spark.catalog.functionExists(s"$db.fn2")) // Find an unqualified function using the current database assert(!spark.catalog.functionExists("fn2")) spark.catalog.setCurrentDatabase(db) assert(spark.catalog.functionExists("fn2")) + + // Unable to find the function, although the temp function with the given name exists + assert(!spark.catalog.functionExists(db, "fn1")) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org