[SPARK-14014][SQL] Integrate session catalog (attempt #2) ## What changes were proposed in this pull request?
This reopens #11836, which was merged but promptly reverted because it introduced flaky Hive tests. ## How was this patch tested? See `CatalogTestCases`, `SessionCatalogSuite` and `HiveContextSuite`. Author: Andrew Or <[email protected]> Closes #11938 from andrewor14/session-catalog-again. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/20ddf5fd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/20ddf5fd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/20ddf5fd Branch: refs/heads/master Commit: 20ddf5fddf40b543edc61d6e4687988489dea64c Parents: 1c70b76 Author: Andrew Or <[email protected]> Authored: Thu Mar 24 22:59:35 2016 -0700 Committer: Andrew Or <[email protected]> Committed: Thu Mar 24 22:59:35 2016 -0700 ---------------------------------------------------------------------- R/pkg/inst/tests/testthat/test_sparkSQL.R | 3 +- project/MimaExcludes.scala | 3 + python/pyspark/sql/context.py | 2 +- .../spark/sql/catalyst/analysis/Analyzer.scala | 20 +- .../spark/sql/catalyst/analysis/Catalog.scala | 218 -------- .../sql/catalyst/analysis/unresolved.scala | 2 +- .../sql/catalyst/catalog/InMemoryCatalog.scala | 35 +- .../sql/catalyst/catalog/SessionCatalog.scala | 123 +++-- .../spark/sql/catalyst/catalog/interface.scala | 2 + .../sql/catalyst/analysis/AnalysisSuite.scala | 6 +- .../sql/catalyst/analysis/AnalysisTest.scala | 23 +- .../analysis/DecimalPrecisionSuite.scala | 25 +- .../sql/catalyst/catalog/CatalogTestCases.scala | 3 +- .../catalyst/catalog/SessionCatalogSuite.scala | 20 +- .../optimizer/BooleanSimplificationSuite.scala | 11 +- .../optimizer/EliminateSortsSuite.scala | 5 +- .../scala/org/apache/spark/sql/SQLContext.scala | 73 ++- .../spark/sql/execution/command/commands.scala | 8 +- .../spark/sql/execution/datasources/ddl.scala | 24 +- .../spark/sql/execution/datasources/rules.scala | 10 +- .../spark/sql/internal/SessionState.scala | 7 +- .../org/apache/spark/sql/ListTablesSuite.scala | 15 +- .../org/apache/spark/sql/SQLContextSuite.scala | 9 +- .../org/apache/spark/sql/SQLQuerySuite.scala | 22 +- .../datasources/parquet/ParquetQuerySuite.scala | 6 +- .../apache/spark/sql/test/SQLTestUtils.scala | 4 +- .../hive/thriftserver/SparkSQLCLIDriver.scala | 3 +- .../spark/sql/hive/thriftserver/CliSuite.scala | 5 +- .../hive/execution/HiveCompatibilitySuite.scala | 23 +- .../org/apache/spark/sql/hive/HiveCatalog.scala | 5 +- .../org/apache/spark/sql/hive/HiveContext.scala | 498 ++++++++++--------- .../spark/sql/hive/HiveMetastoreCatalog.scala | 60 +-- .../spark/sql/hive/HiveSessionCatalog.scala | 104 ++++ .../spark/sql/hive/HiveSessionState.scala | 10 +- .../spark/sql/hive/client/HiveClient.scala | 3 - .../spark/sql/hive/client/HiveClientImpl.scala | 4 - .../hive/execution/CreateTableAsSelect.scala | 4 +- .../sql/hive/execution/CreateViewAsSelect.scala | 4 +- .../hive/execution/InsertIntoHiveTable.scala | 14 +- .../spark/sql/hive/execution/commands.scala | 9 +- .../apache/spark/sql/hive/test/TestHive.scala | 172 +++++-- .../sql/hive/JavaMetastoreDataSourcesSuite.java | 5 +- .../spark/sql/hive/test/TestHiveSingleton.scala | 38 ++ .../spark/sql/hive/ExpressionToSQLSuite.scala | 11 +- .../spark/sql/hive/HiveContextSuite.scala | 37 ++ .../sql/hive/HiveDataFrameAnalyticsSuite.scala | 7 +- .../sql/hive/HiveMetastoreCatalogSuite.scala | 10 +- .../apache/spark/sql/hive/ListTablesSuite.scala | 17 +- .../spark/sql/hive/LogicalPlanToSQLSuite.scala | 15 +- .../sql/hive/MetastoreDataSourcesSuite.scala | 32 +- .../spark/sql/hive/MultiDatabaseSuite.scala | 5 +- .../apache/spark/sql/hive/StatisticsSuite.scala | 3 +- .../spark/sql/hive/client/VersionsSuite.scala | 4 - .../hive/execution/AggregationQuerySuite.scala | 12 +- .../sql/hive/execution/HiveQuerySuite.scala | 16 +- .../spark/sql/hive/execution/HiveUDFSuite.scala | 2 +- .../sql/hive/execution/SQLQuerySuite.scala | 4 +- .../sql/hive/execution/WindowQuerySuite.scala | 7 +- .../spark/sql/hive/orc/OrcQuerySuite.scala | 4 +- .../spark/sql/hive/orc/OrcSourceSuite.scala | 8 +- .../apache/spark/sql/hive/parquetSuites.scala | 25 +- 61 files changed, 1043 insertions(+), 816 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/R/pkg/inst/tests/testthat/test_sparkSQL.R ---------------------------------------------------------------------- diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 63acbad..eef365b 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1817,7 +1817,8 @@ test_that("approxQuantile() on a DataFrame", { test_that("SQL error message is returned from JVM", { retError <- tryCatch(sql(sqlContext, "select * from blah"), error = function(e) e) - expect_equal(grepl("Table not found: blah", retError), TRUE) + expect_equal(grepl("Table not found", retError), TRUE) + expect_equal(grepl("blah", retError), TRUE) }) irisDF <- suppressWarnings(createDataFrame(sqlContext, iris)) http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/project/MimaExcludes.scala ---------------------------------------------------------------------- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 42eafcb..9158983 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -563,6 +563,9 @@ object MimaExcludes { ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerEvent.logEvent"), ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.OutputWriterFactory.newInstance") ) ++ Seq( + // [SPARK-14014] Replace existing analysis.Catalog with SessionCatalog + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SQLContext.this") + ) ++ Seq( // [SPARK-13928] Move org.apache.spark.Logging into org.apache.spark.internal.Logging ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.Logging"), (problem: Problem) => problem match { http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/python/pyspark/sql/context.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 9c2f6a3..4008332 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -554,7 +554,7 @@ class SQLContext(object): >>> sqlContext.registerDataFrameAsTable(df, "table1") >>> "table1" in sqlContext.tableNames() True - >>> "table1" in sqlContext.tableNames("db") + >>> "table1" in sqlContext.tableNames("default") True """ if dbName is None: http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index d0a31e7..b344e04 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{CatalystConf, ScalaReflection, SimpleCatalystConf} +import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.encoders.OuterScopes import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -36,23 +37,22 @@ import org.apache.spark.sql.catalyst.util.usePrettyExpression import org.apache.spark.sql.types._ /** - * A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing - * when all relations are already filled in and the analyzer needs only to resolve attribute - * references. + * A trivial [[Analyzer]] with an dummy [[SessionCatalog]] and [[EmptyFunctionRegistry]]. + * Used for testing when all relations are already filled in and the analyzer needs only + * to resolve attribute references. */ object SimpleAnalyzer - extends Analyzer( - EmptyCatalog, - EmptyFunctionRegistry, - new SimpleCatalystConf(caseSensitiveAnalysis = true)) + extends SimpleAnalyzer(new SimpleCatalystConf(caseSensitiveAnalysis = true)) +class SimpleAnalyzer(conf: CatalystConf) + extends Analyzer(new SessionCatalog(new InMemoryCatalog, conf), EmptyFunctionRegistry, conf) /** * Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and - * [[UnresolvedRelation]]s into fully typed objects using information in a schema [[Catalog]] and - * a [[FunctionRegistry]]. + * [[UnresolvedRelation]]s into fully typed objects using information in a + * [[SessionCatalog]] and a [[FunctionRegistry]]. */ class Analyzer( - catalog: Catalog, + catalog: SessionCatalog, registry: FunctionRegistry, conf: CatalystConf, maxIterations: Int = 100) http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala deleted file mode 100644 index 2f0a4db..0000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.analysis - -import java.util.concurrent.ConcurrentHashMap - -import scala.collection.JavaConverters._ - -import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{CatalystConf, EmptyConf, TableIdentifier} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} - - -/** - * An interface for looking up relations by name. Used by an [[Analyzer]]. - */ -trait Catalog { - - val conf: CatalystConf - - def tableExists(tableIdent: TableIdentifier): Boolean - - def lookupRelation(tableIdent: TableIdentifier, alias: Option[String] = None): LogicalPlan - - def setCurrentDatabase(databaseName: String): Unit = { - throw new UnsupportedOperationException - } - - /** - * Returns tuples of (tableName, isTemporary) for all tables in the given database. - * isTemporary is a Boolean value indicates if a table is a temporary or not. - */ - def getTables(databaseName: Option[String]): Seq[(String, Boolean)] - - def refreshTable(tableIdent: TableIdentifier): Unit - - def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit - - def unregisterTable(tableIdent: TableIdentifier): Unit - - def unregisterAllTables(): Unit - - /** - * Get the table name of TableIdentifier for temporary tables. - */ - protected def getTableName(tableIdent: TableIdentifier): String = { - // It is not allowed to specify database name for temporary tables. - // We check it here and throw exception if database is defined. - if (tableIdent.database.isDefined) { - throw new AnalysisException("Specifying database name or other qualifiers are not allowed " + - "for temporary tables. If the table name has dots (.) in it, please quote the " + - "table name with backticks (`).") - } - if (conf.caseSensitiveAnalysis) { - tableIdent.table - } else { - tableIdent.table.toLowerCase - } - } -} - -class SimpleCatalog(val conf: CatalystConf) extends Catalog { - private[this] val tables = new ConcurrentHashMap[String, LogicalPlan] - - override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = { - tables.put(getTableName(tableIdent), plan) - } - - override def unregisterTable(tableIdent: TableIdentifier): Unit = { - tables.remove(getTableName(tableIdent)) - } - - override def unregisterAllTables(): Unit = { - tables.clear() - } - - override def tableExists(tableIdent: TableIdentifier): Boolean = { - tables.containsKey(getTableName(tableIdent)) - } - - override def lookupRelation( - tableIdent: TableIdentifier, - alias: Option[String] = None): LogicalPlan = { - val tableName = getTableName(tableIdent) - val table = tables.get(tableName) - if (table == null) { - throw new AnalysisException("Table not found: " + tableName) - } - val qualifiedTable = SubqueryAlias(tableName, table) - - // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are - // properly qualified with this alias. - alias - .map(a => SubqueryAlias(a, qualifiedTable)) - .getOrElse(qualifiedTable) - } - - override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = { - tables.keySet().asScala.map(_ -> true).toSeq - } - - override def refreshTable(tableIdent: TableIdentifier): Unit = { - throw new UnsupportedOperationException - } -} - -/** - * A trait that can be mixed in with other Catalogs allowing specific tables to be overridden with - * new logical plans. This can be used to bind query result to virtual tables, or replace tables - * with in-memory cached versions. Note that the set of overrides is stored in memory and thus - * lost when the JVM exits. - */ -trait OverrideCatalog extends Catalog { - private[this] val overrides = new ConcurrentHashMap[String, LogicalPlan] - - private def getOverriddenTable(tableIdent: TableIdentifier): Option[LogicalPlan] = { - if (tableIdent.database.isDefined) { - None - } else { - Option(overrides.get(getTableName(tableIdent))) - } - } - - abstract override def tableExists(tableIdent: TableIdentifier): Boolean = { - getOverriddenTable(tableIdent) match { - case Some(_) => true - case None => super.tableExists(tableIdent) - } - } - - abstract override def lookupRelation( - tableIdent: TableIdentifier, - alias: Option[String] = None): LogicalPlan = { - getOverriddenTable(tableIdent) match { - case Some(table) => - val tableName = getTableName(tableIdent) - val qualifiedTable = SubqueryAlias(tableName, table) - - // If an alias was specified by the lookup, wrap the plan in a sub-query so that attributes - // are properly qualified with this alias. - alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable) - - case None => super.lookupRelation(tableIdent, alias) - } - } - - abstract override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = { - overrides.keySet().asScala.map(_ -> true).toSeq ++ super.getTables(databaseName) - } - - override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = { - overrides.put(getTableName(tableIdent), plan) - } - - override def unregisterTable(tableIdent: TableIdentifier): Unit = { - if (tableIdent.database.isEmpty) { - overrides.remove(getTableName(tableIdent)) - } - } - - override def unregisterAllTables(): Unit = { - overrides.clear() - } -} - -/** - * A trivial catalog that returns an error when a relation is requested. Used for testing when all - * relations are already filled in and the analyzer needs only to resolve attribute references. - */ -object EmptyCatalog extends Catalog { - - override val conf: CatalystConf = EmptyConf - - override def tableExists(tableIdent: TableIdentifier): Boolean = { - throw new UnsupportedOperationException - } - - override def lookupRelation( - tableIdent: TableIdentifier, - alias: Option[String] = None): LogicalPlan = { - throw new UnsupportedOperationException - } - - override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = { - throw new UnsupportedOperationException - } - - override def registerTable(tableIdent: TableIdentifier, plan: LogicalPlan): Unit = { - throw new UnsupportedOperationException - } - - override def unregisterTable(tableIdent: TableIdentifier): Unit = { - throw new UnsupportedOperationException - } - - override def unregisterAllTables(): Unit = { - throw new UnsupportedOperationException - } - - override def refreshTable(tableIdent: TableIdentifier): Unit = { - throw new UnsupportedOperationException - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 9518309..e73d367 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -34,7 +34,7 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str errors.TreeNodeException(tree, s"Invalid call to $function on unresolved object", null) /** - * Holds the name of a relation that has yet to be looked up in a [[Catalog]]. + * Holds the name of a relation that has yet to be looked up in a catalog. */ case class UnresolvedRelation( tableIdentifier: TableIdentifier, http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 7ead1dd..e216fa5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -52,37 +52,34 @@ class InMemoryCatalog extends ExternalCatalog { names.filter { funcName => regex.pattern.matcher(funcName).matches() } } - private def existsFunction(db: String, funcName: String): Boolean = { + private def functionExists(db: String, funcName: String): Boolean = { requireDbExists(db) catalog(db).functions.contains(funcName) } - private def existsTable(db: String, table: String): Boolean = { - requireDbExists(db) - catalog(db).tables.contains(table) - } - - private def existsPartition(db: String, table: String, spec: TablePartitionSpec): Boolean = { + private def partitionExists(db: String, table: String, spec: TablePartitionSpec): Boolean = { requireTableExists(db, table) catalog(db).tables(table).partitions.contains(spec) } private def requireFunctionExists(db: String, funcName: String): Unit = { - if (!existsFunction(db, funcName)) { - throw new AnalysisException(s"Function '$funcName' does not exist in database '$db'") + if (!functionExists(db, funcName)) { + throw new AnalysisException( + s"Function not found: '$funcName' does not exist in database '$db'") } } private def requireTableExists(db: String, table: String): Unit = { - if (!existsTable(db, table)) { - throw new AnalysisException(s"Table '$table' does not exist in database '$db'") + if (!tableExists(db, table)) { + throw new AnalysisException( + s"Table not found: '$table' does not exist in database '$db'") } } private def requirePartitionExists(db: String, table: String, spec: TablePartitionSpec): Unit = { - if (!existsPartition(db, table, spec)) { + if (!partitionExists(db, table, spec)) { throw new AnalysisException( - s"Partition does not exist in database '$db' table '$table': '$spec'") + s"Partition not found: database '$db' table '$table' does not contain: '$spec'") } } @@ -159,7 +156,7 @@ class InMemoryCatalog extends ExternalCatalog { ignoreIfExists: Boolean): Unit = synchronized { requireDbExists(db) val table = tableDefinition.name.table - if (existsTable(db, table)) { + if (tableExists(db, table)) { if (!ignoreIfExists) { throw new AnalysisException(s"Table '$table' already exists in database '$db'") } @@ -173,7 +170,7 @@ class InMemoryCatalog extends ExternalCatalog { table: String, ignoreIfNotExists: Boolean): Unit = synchronized { requireDbExists(db) - if (existsTable(db, table)) { + if (tableExists(db, table)) { catalog(db).tables.remove(table) } else { if (!ignoreIfNotExists) { @@ -200,13 +197,17 @@ class InMemoryCatalog extends ExternalCatalog { catalog(db).tables(table).table } + override def tableExists(db: String, table: String): Boolean = synchronized { + requireDbExists(db) + catalog(db).tables.contains(table) + } + override def listTables(db: String): Seq[String] = synchronized { requireDbExists(db) catalog(db).tables.keySet.toSeq } override def listTables(db: String, pattern: String): Seq[String] = synchronized { - requireDbExists(db) filterPattern(listTables(db), pattern) } @@ -295,7 +296,7 @@ class InMemoryCatalog extends ExternalCatalog { override def createFunction(db: String, func: CatalogFunction): Unit = synchronized { requireDbExists(db) - if (existsFunction(db, func.name.funcName)) { + if (functionExists(db, func.name.funcName)) { throw new AnalysisException(s"Function '$func' already exists in '$db' database") } else { catalog(db).functions.put(func.name.funcName, func) http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 3ac2bcf..34265fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} @@ -31,17 +32,34 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} * proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary * tables and functions of the Spark Session that it belongs to. */ -class SessionCatalog(externalCatalog: ExternalCatalog) { +class SessionCatalog(externalCatalog: ExternalCatalog, conf: CatalystConf) { import ExternalCatalog._ - private[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan] - private[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction] + def this(externalCatalog: ExternalCatalog) { + this(externalCatalog, new SimpleCatalystConf(true)) + } + + protected[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan] + protected[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction] // Note: we track current database here because certain operations do not explicitly // specify the database (e.g. DROP TABLE my_table). In these cases we must first // check whether the temporary table or function exists, then, if not, operate on // the corresponding item in the current database. - private[this] var currentDb = "default" + protected[this] var currentDb = { + val defaultName = "default" + val defaultDbDefinition = CatalogDatabase(defaultName, "default database", "", Map()) + // Initialize default database if it doesn't already exist + createDatabase(defaultDbDefinition, ignoreIfExists = true) + defaultName + } + + /** + * Format table name, taking into account case sensitivity. + */ + protected[this] def formatTableName(name: String): String = { + if (conf.caseSensitiveAnalysis) name else name.toLowerCase + } // ---------------------------------------------------------------------------- // Databases @@ -105,8 +123,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { */ def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = { val db = tableDefinition.name.database.getOrElse(currentDb) - val newTableDefinition = tableDefinition.copy( - name = TableIdentifier(tableDefinition.name.table, Some(db))) + val table = formatTableName(tableDefinition.name.table) + val newTableDefinition = tableDefinition.copy(name = TableIdentifier(table, Some(db))) externalCatalog.createTable(db, newTableDefinition, ignoreIfExists) } @@ -121,8 +139,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { */ def alterTable(tableDefinition: CatalogTable): Unit = { val db = tableDefinition.name.database.getOrElse(currentDb) - val newTableDefinition = tableDefinition.copy( - name = TableIdentifier(tableDefinition.name.table, Some(db))) + val table = formatTableName(tableDefinition.name.table) + val newTableDefinition = tableDefinition.copy(name = TableIdentifier(table, Some(db))) externalCatalog.alterTable(db, newTableDefinition) } @@ -132,7 +150,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { */ def getTable(name: TableIdentifier): CatalogTable = { val db = name.database.getOrElse(currentDb) - externalCatalog.getTable(db, name.table) + val table = formatTableName(name.table) + externalCatalog.getTable(db, table) } // ------------------------------------------------------------- @@ -146,10 +165,11 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { name: String, tableDefinition: LogicalPlan, ignoreIfExists: Boolean): Unit = { - if (tempTables.containsKey(name) && !ignoreIfExists) { + val table = formatTableName(name) + if (tempTables.containsKey(table) && !ignoreIfExists) { throw new AnalysisException(s"Temporary table '$name' already exists.") } - tempTables.put(name, tableDefinition) + tempTables.put(table, tableDefinition) } /** @@ -166,11 +186,13 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { throw new AnalysisException("rename does not support moving tables across databases") } val db = oldName.database.getOrElse(currentDb) - if (oldName.database.isDefined || !tempTables.containsKey(oldName.table)) { - externalCatalog.renameTable(db, oldName.table, newName.table) + val oldTableName = formatTableName(oldName.table) + val newTableName = formatTableName(newName.table) + if (oldName.database.isDefined || !tempTables.containsKey(oldTableName)) { + externalCatalog.renameTable(db, oldTableName, newTableName) } else { - val table = tempTables.remove(oldName.table) - tempTables.put(newName.table, table) + val table = tempTables.remove(oldTableName) + tempTables.put(newTableName, table) } } @@ -183,10 +205,11 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { */ def dropTable(name: TableIdentifier, ignoreIfNotExists: Boolean): Unit = { val db = name.database.getOrElse(currentDb) - if (name.database.isDefined || !tempTables.containsKey(name.table)) { - externalCatalog.dropTable(db, name.table, ignoreIfNotExists) + val table = formatTableName(name.table) + if (name.database.isDefined || !tempTables.containsKey(table)) { + externalCatalog.dropTable(db, table, ignoreIfNotExists) } else { - tempTables.remove(name.table) + tempTables.remove(table) } } @@ -199,29 +222,44 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { */ def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = { val db = name.database.getOrElse(currentDb) + val table = formatTableName(name.table) val relation = - if (name.database.isDefined || !tempTables.containsKey(name.table)) { - val metadata = externalCatalog.getTable(db, name.table) + if (name.database.isDefined || !tempTables.containsKey(table)) { + val metadata = externalCatalog.getTable(db, table) CatalogRelation(db, metadata, alias) } else { - tempTables.get(name.table) + tempTables.get(table) } - val qualifiedTable = SubqueryAlias(name.table, relation) + val qualifiedTable = SubqueryAlias(table, relation) // If an alias was specified by the lookup, wrap the plan in a subquery so that // attributes are properly qualified with this alias. alias.map(a => SubqueryAlias(a, qualifiedTable)).getOrElse(qualifiedTable) } /** - * List all tables in the specified database, including temporary tables. + * Return whether a table with the specified name exists. + * + * Note: If a database is explicitly specified, then this will return whether the table + * exists in that particular database instead. In that case, even if there is a temporary + * table with the same name, we will return false if the specified database does not + * contain the table. */ - def listTables(db: String): Seq[TableIdentifier] = { - val dbTables = externalCatalog.listTables(db).map { t => TableIdentifier(t, Some(db)) } - val _tempTables = tempTables.keys().asScala.map { t => TableIdentifier(t) } - dbTables ++ _tempTables + def tableExists(name: TableIdentifier): Boolean = { + val db = name.database.getOrElse(currentDb) + val table = formatTableName(name.table) + if (name.database.isDefined || !tempTables.containsKey(table)) { + externalCatalog.tableExists(db, table) + } else { + true // it's a temporary table + } } /** + * List all tables in the specified database, including temporary tables. + */ + def listTables(db: String): Seq[TableIdentifier] = listTables(db, "*") + + /** * List all matching tables in the specified database, including temporary tables. */ def listTables(db: String, pattern: String): Seq[TableIdentifier] = { @@ -235,6 +273,19 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { } /** + * Refresh the cache entry for a metastore table, if any. + */ + def refreshTable(name: TableIdentifier): Unit = { /* no-op */ } + + /** + * Drop all existing temporary tables. + * For testing only. + */ + def clearTempTables(): Unit = { + tempTables.clear() + } + + /** * Return a temporary table exactly as it was stored. * For testing only. */ @@ -263,7 +314,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { parts: Seq[CatalogTablePartition], ignoreIfExists: Boolean): Unit = { val db = tableName.database.getOrElse(currentDb) - externalCatalog.createPartitions(db, tableName.table, parts, ignoreIfExists) + val table = formatTableName(tableName.table) + externalCatalog.createPartitions(db, table, parts, ignoreIfExists) } /** @@ -275,7 +327,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { parts: Seq[TablePartitionSpec], ignoreIfNotExists: Boolean): Unit = { val db = tableName.database.getOrElse(currentDb) - externalCatalog.dropPartitions(db, tableName.table, parts, ignoreIfNotExists) + val table = formatTableName(tableName.table) + externalCatalog.dropPartitions(db, table, parts, ignoreIfNotExists) } /** @@ -289,7 +342,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { specs: Seq[TablePartitionSpec], newSpecs: Seq[TablePartitionSpec]): Unit = { val db = tableName.database.getOrElse(currentDb) - externalCatalog.renamePartitions(db, tableName.table, specs, newSpecs) + val table = formatTableName(tableName.table) + externalCatalog.renamePartitions(db, table, specs, newSpecs) } /** @@ -303,7 +357,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { */ def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = { val db = tableName.database.getOrElse(currentDb) - externalCatalog.alterPartitions(db, tableName.table, parts) + val table = formatTableName(tableName.table) + externalCatalog.alterPartitions(db, table, parts) } /** @@ -312,7 +367,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { */ def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition = { val db = tableName.database.getOrElse(currentDb) - externalCatalog.getPartition(db, tableName.table, spec) + val table = formatTableName(tableName.table) + externalCatalog.getPartition(db, table, spec) } /** @@ -321,7 +377,8 @@ class SessionCatalog(externalCatalog: ExternalCatalog) { */ def listPartitions(tableName: TableIdentifier): Seq[CatalogTablePartition] = { val db = tableName.database.getOrElse(currentDb) - externalCatalog.listPartitions(db, tableName.table) + val table = formatTableName(tableName.table) + externalCatalog.listPartitions(db, table) } // ---------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index c4e4961..3480313 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -91,6 +91,8 @@ abstract class ExternalCatalog { def getTable(db: String, table: String): CatalogTable + def tableExists(db: String, table: String): Boolean + def listTables(db: String): Seq[String] def listTables(db: String, pattern: String): Seq[String] http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 9563f43..346e052 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -161,14 +161,10 @@ class AnalysisSuite extends AnalysisTest { } test("resolve relations") { - assertAnalysisError( - UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq("Table not found: tAbLe")) - + assertAnalysisError(UnresolvedRelation(TableIdentifier("tAbLe"), None), Seq()) checkAnalysis(UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation) - checkAnalysis( UnresolvedRelation(TableIdentifier("tAbLe"), None), testRelation, caseSensitive = false) - checkAnalysis( UnresolvedRelation(TableIdentifier("TaBlE"), None), testRelation, caseSensitive = false) } http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala index 39166c4..6fa4bee 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisTest.scala @@ -18,26 +18,21 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{SimpleCatalystConf, TableIdentifier} +import org.apache.spark.sql.catalyst.SimpleCatalystConf +import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical._ trait AnalysisTest extends PlanTest { - val (caseSensitiveAnalyzer, caseInsensitiveAnalyzer) = { - val caseSensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = true) - val caseInsensitiveConf = new SimpleCatalystConf(caseSensitiveAnalysis = false) + protected val caseSensitiveAnalyzer = makeAnalyzer(caseSensitive = true) + protected val caseInsensitiveAnalyzer = makeAnalyzer(caseSensitive = false) - val caseSensitiveCatalog = new SimpleCatalog(caseSensitiveConf) - val caseInsensitiveCatalog = new SimpleCatalog(caseInsensitiveConf) - - caseSensitiveCatalog.registerTable(TableIdentifier("TaBlE"), TestRelations.testRelation) - caseInsensitiveCatalog.registerTable(TableIdentifier("TaBlE"), TestRelations.testRelation) - - new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitiveConf) { - override val extendedResolutionRules = EliminateSubqueryAliases :: Nil - } -> - new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseInsensitiveConf) { + private def makeAnalyzer(caseSensitive: Boolean): Analyzer = { + val conf = new SimpleCatalystConf(caseSensitive) + val catalog = new SessionCatalog(new InMemoryCatalog, conf) + catalog.createTempTable("TaBlE", TestRelations.testRelation, ignoreIfExists = true) + new Analyzer(catalog, EmptyFunctionRegistry, conf) { override val extendedResolutionRules = EliminateSubqueryAliases :: Nil } } http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala index 9aa685e..3150186 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql.catalyst.analysis import org.scalatest.BeforeAndAfter -import org.apache.spark.sql.catalyst.{SimpleCatalystConf, TableIdentifier} +import org.apache.spark.sql.catalyst.SimpleCatalystConf +import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -30,11 +31,11 @@ import org.apache.spark.sql.types._ class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter { - val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true) - val catalog = new SimpleCatalog(conf) - val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf) + private val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true) + private val catalog = new SessionCatalog(new InMemoryCatalog, conf) + private val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf) - val relation = LocalRelation( + private val relation = LocalRelation( AttributeReference("i", IntegerType)(), AttributeReference("d1", DecimalType(2, 1))(), AttributeReference("d2", DecimalType(5, 2))(), @@ -43,15 +44,15 @@ class DecimalPrecisionSuite extends PlanTest with BeforeAndAfter { AttributeReference("b", DoubleType)() ) - val i: Expression = UnresolvedAttribute("i") - val d1: Expression = UnresolvedAttribute("d1") - val d2: Expression = UnresolvedAttribute("d2") - val u: Expression = UnresolvedAttribute("u") - val f: Expression = UnresolvedAttribute("f") - val b: Expression = UnresolvedAttribute("b") + private val i: Expression = UnresolvedAttribute("i") + private val d1: Expression = UnresolvedAttribute("d1") + private val d2: Expression = UnresolvedAttribute("d2") + private val u: Expression = UnresolvedAttribute("u") + private val f: Expression = UnresolvedAttribute("f") + private val b: Expression = UnresolvedAttribute("b") before { - catalog.registerTable(TableIdentifier("table"), relation) + catalog.createTempTable("table", relation, ignoreIfExists = true) } private def checkType(expression: Expression, expectedType: DataType): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala index a1ea619..277c2d7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala @@ -225,13 +225,14 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach { test("list tables without pattern") { val catalog = newBasicCatalog() + intercept[AnalysisException] { catalog.listTables("unknown_db") } assert(catalog.listTables("db1").toSet == Set.empty) assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2")) } test("list tables with pattern") { val catalog = newBasicCatalog() - intercept[AnalysisException] { catalog.listTables("unknown_db") } + intercept[AnalysisException] { catalog.listTables("unknown_db", "*") } assert(catalog.listTables("db1", "*").toSet == Set.empty) assert(catalog.listTables("db2", "*").toSet == Set("tbl1", "tbl2")) assert(catalog.listTables("db2", "tbl*").toSet == Set("tbl1", "tbl2")) http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index e1973ee..74e995c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -397,6 +397,24 @@ class SessionCatalogSuite extends SparkFunSuite { TableIdentifier("tbl1", Some("db2")), alias = Some(alias)) == relationWithAlias) } + test("table exists") { + val catalog = new SessionCatalog(newBasicCatalog()) + assert(catalog.tableExists(TableIdentifier("tbl1", Some("db2")))) + assert(catalog.tableExists(TableIdentifier("tbl2", Some("db2")))) + assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2")))) + assert(!catalog.tableExists(TableIdentifier("tbl1", Some("db1")))) + assert(!catalog.tableExists(TableIdentifier("tbl2", Some("db1")))) + // If database is explicitly specified, do not check temporary tables + val tempTable = Range(1, 10, 1, 10, Seq()) + catalog.createTempTable("tbl3", tempTable, ignoreIfExists = false) + assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2")))) + // If database is not explicitly specified, check the current database + catalog.setCurrentDatabase("db2") + assert(catalog.tableExists(TableIdentifier("tbl1"))) + assert(catalog.tableExists(TableIdentifier("tbl2"))) + assert(catalog.tableExists(TableIdentifier("tbl3"))) + } + test("list tables without pattern") { val catalog = new SessionCatalog(newBasicCatalog()) val tempTable = Range(1, 10, 2, 10, Seq()) @@ -429,7 +447,7 @@ class SessionCatalogSuite extends SparkFunSuite { assert(catalog.listTables("db2", "*1").toSet == Set(TableIdentifier("tbl1"), TableIdentifier("tbl1", Some("db2")))) intercept[AnalysisException] { - catalog.listTables("unknown_db") + catalog.listTables("unknown_db", "*") } } http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala index 2ab31ee..e2c76b7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.SimpleCatalystConf import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ @@ -137,11 +138,11 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper { checkCondition(!(('a || 'b) && ('c || 'd)), (!'a && !'b) || (!'c && !'d)) } - private val caseInsensitiveAnalyzer = - new Analyzer( - EmptyCatalog, - EmptyFunctionRegistry, - new SimpleCatalystConf(caseSensitiveAnalysis = false)) + private val caseInsensitiveConf = new SimpleCatalystConf(false) + private val caseInsensitiveAnalyzer = new Analyzer( + new SessionCatalog(new InMemoryCatalog, caseInsensitiveConf), + EmptyFunctionRegistry, + caseInsensitiveConf) test("(a && b) || (a && c) => a && (b || c) when case insensitive") { val plan = caseInsensitiveAnalyzer.execute( http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala index a4c8d1c..3824c67 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.SimpleCatalystConf -import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry, SimpleCatalog} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, EmptyFunctionRegistry} +import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ @@ -28,7 +29,7 @@ import org.apache.spark.sql.catalyst.rules._ class EliminateSortsSuite extends PlanTest { val conf = new SimpleCatalystConf(caseSensitiveAnalysis = true, orderByOrdinal = false) - val catalog = new SimpleCatalog(conf) + val catalog = new SessionCatalog(new InMemoryCatalog, conf) val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf) object Optimize extends RuleExecutor[LogicalPlan] { http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 853a74c..e413e77 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -25,13 +25,14 @@ import scala.collection.JavaConverters._ import scala.collection.immutable import scala.reflect.runtime.universe.TypeTag -import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.{SparkConf, SparkContext, SparkException} import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql.catalyst._ +import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog} import org.apache.spark.sql.catalyst.encoders.encoderFor import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range} @@ -65,13 +66,14 @@ class SQLContext private[sql]( @transient val sparkContext: SparkContext, @transient protected[sql] val cacheManager: CacheManager, @transient private[sql] val listener: SQLListener, - val isRootContext: Boolean) + val isRootContext: Boolean, + @transient private[sql] val externalCatalog: ExternalCatalog) extends Logging with Serializable { self => - def this(sparkContext: SparkContext) = { - this(sparkContext, new CacheManager, SQLContext.createListenerAndUI(sparkContext), true) + def this(sc: SparkContext) = { + this(sc, new CacheManager, SQLContext.createListenerAndUI(sc), true, new InMemoryCatalog) } def this(sparkContext: JavaSparkContext) = this(sparkContext.sc) @@ -109,7 +111,8 @@ class SQLContext private[sql]( sparkContext = sparkContext, cacheManager = cacheManager, listener = listener, - isRootContext = false) + isRootContext = false, + externalCatalog = externalCatalog) } /** @@ -186,6 +189,12 @@ class SQLContext private[sql]( */ def getAllConfs: immutable.Map[String, String] = conf.getAllConfs + // Extract `spark.sql.*` entries and put it in our SQLConf. + // Subclasses may additionally set these entries in other confs. + SQLContext.getSQLProperties(sparkContext.getConf).asScala.foreach { case (k, v) => + setConf(k, v) + } + protected[sql] def parseSql(sql: String): LogicalPlan = sessionState.sqlParser.parsePlan(sql) protected[sql] def executeSql(sql: String): QueryExecution = executePlan(parseSql(sql)) @@ -199,30 +208,6 @@ class SQLContext private[sql]( sparkContext.addJar(path) } - { - // We extract spark sql settings from SparkContext's conf and put them to - // Spark SQL's conf. - // First, we populate the SQLConf (conf). So, we can make sure that other values using - // those settings in their construction can get the correct settings. - // For example, metadataHive in HiveContext may need both spark.sql.hive.metastore.version - // and spark.sql.hive.metastore.jars to get correctly constructed. - val properties = new Properties - sparkContext.getConf.getAll.foreach { - case (key, value) if key.startsWith("spark.sql") => properties.setProperty(key, value) - case _ => - } - // We directly put those settings to conf to avoid of calling setConf, which may have - // side-effects. For example, in HiveContext, setConf may cause executionHive and metadataHive - // get constructed. If we call setConf directly, the constructed metadataHive may have - // wrong settings, or the construction may fail. - conf.setConf(properties) - // After we have populated SQLConf, we call setConf to populate other confs in the subclass - // (e.g. hiveconf in HiveContext). - properties.asScala.foreach { - case (key, value) => setConf(key, value) - } - } - /** * :: Experimental :: * A collection of methods that are considered experimental, but can be used to hook into @@ -683,8 +668,10 @@ class SQLContext private[sql]( * only during the lifetime of this instance of SQLContext. */ private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = { - sessionState.catalog.registerTable( - sessionState.sqlParser.parseTableIdentifier(tableName), df.logicalPlan) + sessionState.catalog.createTempTable( + sessionState.sqlParser.parseTableIdentifier(tableName).table, + df.logicalPlan, + ignoreIfExists = true) } /** @@ -697,7 +684,7 @@ class SQLContext private[sql]( */ def dropTempTable(tableName: String): Unit = { cacheManager.tryUncacheQuery(table(tableName)) - sessionState.catalog.unregisterTable(TableIdentifier(tableName)) + sessionState.catalog.dropTable(TableIdentifier(tableName), ignoreIfNotExists = true) } /** @@ -824,9 +811,7 @@ class SQLContext private[sql]( * @since 1.3.0 */ def tableNames(): Array[String] = { - sessionState.catalog.getTables(None).map { - case (tableName, _) => tableName - }.toArray + tableNames(sessionState.catalog.getCurrentDatabase) } /** @@ -836,9 +821,7 @@ class SQLContext private[sql]( * @since 1.3.0 */ def tableNames(databaseName: String): Array[String] = { - sessionState.catalog.getTables(Some(databaseName)).map { - case (tableName, _) => tableName - }.toArray + sessionState.catalog.listTables(databaseName).map(_.table).toArray } @transient @@ -1025,4 +1008,18 @@ object SQLContext { } sqlListener.get() } + + /** + * Extract `spark.sql.*` properties from the conf and return them as a [[Properties]]. + */ + private[sql] def getSQLProperties(sparkConf: SparkConf): Properties = { + val properties = new Properties + sparkConf.getAll.foreach { case (key, value) => + if (key.startsWith("spark.sql")) { + properties.setProperty(key, value) + } + } + properties + } + } http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 59c3ffc..964f0a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -339,10 +339,12 @@ case class ShowTablesCommand(databaseName: Option[String]) extends RunnableComma override def run(sqlContext: SQLContext): Seq[Row] = { // Since we need to return a Seq of rows, we will call getTables directly // instead of calling tables in sqlContext. - val rows = sqlContext.sessionState.catalog.getTables(databaseName).map { - case (tableName, isTemporary) => Row(tableName, isTemporary) + val catalog = sqlContext.sessionState.catalog + val db = databaseName.getOrElse(catalog.getCurrentDatabase) + val rows = catalog.listTables(db).map { t => + val isTemp = t.database.isEmpty + Row(t.table, isTemp) } - rows } } http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index 9e8e035..24923bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -93,15 +93,21 @@ case class CreateTempTableUsing( provider: String, options: Map[String, String]) extends RunnableCommand { + if (tableIdent.database.isDefined) { + throw new AnalysisException( + s"Temporary table '$tableIdent' should not have specified a database") + } + def run(sqlContext: SQLContext): Seq[Row] = { val dataSource = DataSource( sqlContext, userSpecifiedSchema = userSpecifiedSchema, className = provider, options = options) - sqlContext.sessionState.catalog.registerTable( - tableIdent, - Dataset.ofRows(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan) + sqlContext.sessionState.catalog.createTempTable( + tableIdent.table, + Dataset.ofRows(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan, + ignoreIfExists = true) Seq.empty[Row] } @@ -115,6 +121,11 @@ case class CreateTempTableUsingAsSelect( options: Map[String, String], query: LogicalPlan) extends RunnableCommand { + if (tableIdent.database.isDefined) { + throw new AnalysisException( + s"Temporary table '$tableIdent' should not have specified a database") + } + override def run(sqlContext: SQLContext): Seq[Row] = { val df = Dataset.ofRows(sqlContext, query) val dataSource = DataSource( @@ -124,9 +135,10 @@ case class CreateTempTableUsingAsSelect( bucketSpec = None, options = options) val result = dataSource.write(mode, df) - sqlContext.sessionState.catalog.registerTable( - tableIdent, - Dataset.ofRows(sqlContext, LogicalRelation(result)).logicalPlan) + sqlContext.sessionState.catalog.createTempTable( + tableIdent.table, + Dataset.ofRows(sqlContext, LogicalRelation(result)).logicalPlan, + ignoreIfExists = true) Seq.empty[Row] } http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 63f0e4f..28ac458 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -19,10 +19,12 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.{AnalysisException, SaveMode, SQLContext} import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, RowOrdering} import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation, InsertableRelation} /** @@ -99,7 +101,9 @@ private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] { /** * A rule to do various checks before inserting into or writing to a data source table. */ -private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => Unit) { +private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) + extends (LogicalPlan => Unit) { + def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) } def apply(plan: LogicalPlan): Unit = { @@ -139,7 +143,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => } PartitioningUtils.validatePartitionColumnDataTypes( - r.schema, part.keySet.toSeq, catalog.conf.caseSensitiveAnalysis) + r.schema, part.keySet.toSeq, conf.caseSensitiveAnalysis) // Get all input data source relations of the query. val srcRelations = query.collect { @@ -190,7 +194,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => } PartitioningUtils.validatePartitionColumnDataTypes( - c.child.schema, c.partitionColumns, catalog.conf.caseSensitiveAnalysis) + c.child.schema, c.partitionColumns, conf.caseSensitiveAnalysis) for { spec <- c.bucketSpec http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index e6be0ab..e5f02ca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -18,7 +18,8 @@ package org.apache.spark.sql.internal import org.apache.spark.sql.{ContinuousQueryManager, ExperimentalMethods, SQLContext, UDFRegistration} -import org.apache.spark.sql.catalyst.analysis.{Analyzer, Catalog, FunctionRegistry, SimpleCatalog} +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} +import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.rules.RuleExecutor @@ -45,7 +46,7 @@ private[sql] class SessionState(ctx: SQLContext) { /** * Internal catalog for managing table and database states. */ - lazy val catalog: Catalog = new SimpleCatalog(conf) + lazy val catalog = new SessionCatalog(ctx.externalCatalog, conf) /** * Internal catalog for managing functions registered by the user. @@ -68,7 +69,7 @@ private[sql] class SessionState(ctx: SQLContext) { DataSourceAnalysis :: (if (conf.runSQLOnFile) new ResolveDataSource(ctx) :: Nil else Nil) - override val extendedCheckRules = Seq(datasources.PreWriteCheck(catalog)) + override val extendedCheckRules = Seq(datasources.PreWriteCheck(conf, catalog)) } } http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala index 2820e4f..bb54c52 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala @@ -33,7 +33,8 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex } after { - sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable")) + sqlContext.sessionState.catalog.dropTable( + TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true) } test("get all tables") { @@ -45,20 +46,22 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex sql("SHOW tables").filter("tableName = 'ListTablesSuiteTable'"), Row("ListTablesSuiteTable", true)) - sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable")) + sqlContext.sessionState.catalog.dropTable( + TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true) assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0) } - test("getting all Tables with a database name has no impact on returned table names") { + test("getting all tables with a database name has no impact on returned table names") { checkAnswer( - sqlContext.tables("DB").filter("tableName = 'ListTablesSuiteTable'"), + sqlContext.tables("default").filter("tableName = 'ListTablesSuiteTable'"), Row("ListTablesSuiteTable", true)) checkAnswer( - sql("show TABLES in DB").filter("tableName = 'ListTablesSuiteTable'"), + sql("show TABLES in default").filter("tableName = 'ListTablesSuiteTable'"), Row("ListTablesSuiteTable", true)) - sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable")) + sqlContext.sessionState.catalog.dropTable( + TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true) assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0) } http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala index 2ad92b5..2f62ad4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.SQLConf -class SQLContextSuite extends SparkFunSuite with SharedSparkContext{ +class SQLContextSuite extends SparkFunSuite with SharedSparkContext { object DummyRule extends Rule[LogicalPlan] { def apply(p: LogicalPlan): LogicalPlan = p @@ -78,4 +78,11 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext{ sqlContext.experimental.extraOptimizations = Seq(DummyRule) assert(sqlContext.sessionState.optimizer.batches.flatMap(_.rules).contains(DummyRule)) } + + test("SQLContext can access `spark.sql.*` configs") { + sc.conf.set("spark.sql.with.or.without.you", "my love") + val sqlContext = new SQLContext(sc) + assert(sqlContext.getConf("spark.sql.with.or.without.you") == "my love") + } + } http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 077e579..c958eac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1476,12 +1476,16 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { } test("SPARK-4699 case sensitivity SQL query") { - sqlContext.setConf(SQLConf.CASE_SENSITIVE, false) - val data = TestData(1, "val_1") :: TestData(2, "val_2") :: Nil - val rdd = sparkContext.parallelize((0 to 1).map(i => data(i))) - rdd.toDF().registerTempTable("testTable1") - checkAnswer(sql("SELECT VALUE FROM TESTTABLE1 where KEY = 1"), Row("val_1")) - sqlContext.setConf(SQLConf.CASE_SENSITIVE, true) + val orig = sqlContext.getConf(SQLConf.CASE_SENSITIVE) + try { + sqlContext.setConf(SQLConf.CASE_SENSITIVE, false) + val data = TestData(1, "val_1") :: TestData(2, "val_2") :: Nil + val rdd = sparkContext.parallelize((0 to 1).map(i => data(i))) + rdd.toDF().registerTempTable("testTable1") + checkAnswer(sql("SELECT VALUE FROM TESTTABLE1 where KEY = 1"), Row("val_1")) + } finally { + sqlContext.setConf(SQLConf.CASE_SENSITIVE, orig) + } } test("SPARK-6145: ORDER BY test for nested fields") { @@ -1755,7 +1759,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { .format("parquet") .save(path) - val message = intercept[AnalysisException] { + // We don't support creating a temporary table while specifying a database + intercept[AnalysisException] { sqlContext.sql( s""" |CREATE TEMPORARY TABLE db.t @@ -1765,9 +1770,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { |) """.stripMargin) }.getMessage - assert(message.contains("Specifying database name or other qualifiers are not allowed")) - // If you use backticks to quote the name of a temporary table having dot in it. + // If you use backticks to quote the name then it's OK. sqlContext.sql( s""" |CREATE TEMPORARY TABLE `db.t` http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index f8166c7..2f806eb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -51,7 +51,8 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext sql("INSERT INTO TABLE t SELECT * FROM tmp") checkAnswer(sqlContext.table("t"), (data ++ data).map(Row.fromTuple)) } - sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("tmp")) + sqlContext.sessionState.catalog.dropTable( + TableIdentifier("tmp"), ignoreIfNotExists = true) } test("overwriting") { @@ -61,7 +62,8 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp") checkAnswer(sqlContext.table("t"), data.map(Row.fromTuple)) } - sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("tmp")) + sqlContext.sessionState.catalog.dropTable( + TableIdentifier("tmp"), ignoreIfNotExists = true) } test("self-join") { http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala index d483585..80a85a6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -189,8 +189,8 @@ private[sql] trait SQLTestUtils * `f` returns. */ protected def activateDatabase(db: String)(f: => Unit): Unit = { - sqlContext.sql(s"USE $db") - try f finally sqlContext.sql(s"USE default") + sqlContext.sessionState.catalog.setCurrentDatabase(db) + try f finally sqlContext.sessionState.catalog.setCurrentDatabase("default") } /** http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 7fe31b0..5769328 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -150,7 +150,8 @@ private[hive] object SparkSQLCLIDriver extends Logging { } if (sessionState.database != null) { - SparkSQLEnv.hiveContext.runSqlHive(s"USE ${sessionState.database}") + SparkSQLEnv.hiveContext.sessionState.catalog.setCurrentDatabase( + s"${sessionState.database}") } // Execute -i init files (always in silent mode) http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 032965d..8e1ebe2 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -193,10 +193,7 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { ) runCliWithin(2.minute, Seq("--database", "hive_test_db", "-e", "SHOW TABLES;"))( - "" - -> "OK", - "" - -> "hive_test" + "" -> "hive_test" ) } http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 05f59f1..650797f 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -60,16 +60,19 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { } override def afterAll() { - TestHive.cacheTables = false - TimeZone.setDefault(originalTimeZone) - Locale.setDefault(originalLocale) - TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize) - TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning) - TestHive.sessionState.functionRegistry.restore() - - // For debugging dump some statistics about how much time was spent in various optimizer rules. - logWarning(RuleExecutor.dumpTimeSpent()) - super.afterAll() + try { + TestHive.cacheTables = false + TimeZone.setDefault(originalTimeZone) + Locale.setDefault(originalLocale) + TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize) + TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning) + TestHive.sessionState.functionRegistry.restore() + + // For debugging dump some statistics about how much time was spent in various optimizer rules. + logWarning(RuleExecutor.dumpTimeSpent()) + } finally { + super.afterAll() + } } /** A list of tests deemed out of scope currently and thus completely disregarded. */ http://git-wip-us.apache.org/repos/asf/spark/blob/20ddf5fd/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala index 491f2ae..0722fb0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala @@ -85,7 +85,6 @@ private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog wit withClient { getTable(db, table) } } - // -------------------------------------------------------------------------- // Databases // -------------------------------------------------------------------------- @@ -182,6 +181,10 @@ private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog wit client.getTable(db, table) } + override def tableExists(db: String, table: String): Boolean = withClient { + client.getTableOption(db, table).isDefined + } + override def listTables(db: String): Seq[String] = withClient { requireDbExists(db) client.listTables(db) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
