This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new fc9d421a2345 [SPARK-49211][SQL][FOLLOW-UP] Support catalog in
QualifiedTableName
fc9d421a2345 is described below
commit fc9d421a2345987105aa97947c867ac80ba48a05
Author: Rui Wang <[email protected]>
AuthorDate: Fri Sep 27 08:26:24 2024 +0800
[SPARK-49211][SQL][FOLLOW-UP] Support catalog in QualifiedTableName
### What changes were proposed in this pull request?
Support catalog in QualifiedTableName and remove `FullQualifiedTableName`.
### Why are the changes needed?
Consolidate and remove duplicate code.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing UT
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #48255 from amaliujia/qualifedtablename.
Authored-by: Rui Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/catalyst/catalog/SessionCatalog.scala | 18 +++++++++---------
.../org/apache/spark/sql/catalyst/identifiers.scala | 21 +++++++++++++++++----
.../sql/catalyst/catalog/SessionCatalogSuite.scala | 8 ++++----
.../execution/datasources/DataSourceStrategy.scala | 4 ++--
.../execution/datasources/v2/V2SessionCatalog.scala | 4 ++--
.../spark/sql/StatisticsCollectionTestBase.scala | 4 ++--
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 10 +++++-----
.../spark/sql/execution/command/DDLSuite.scala | 6 +++---
.../execution/command/v1/TruncateTableSuite.scala | 4 ++--
.../spark/sql/hive/HiveMetastoreCatalog.scala | 12 ++++++------
10 files changed, 52 insertions(+), 39 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index d3a6cb6ae284..a0f7af10fefa 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -197,7 +197,7 @@ class SessionCatalog(
}
}
- private val tableRelationCache: Cache[FullQualifiedTableName, LogicalPlan] =
{
+ private val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
var builder = CacheBuilder.newBuilder()
.maximumSize(cacheSize)
@@ -205,33 +205,33 @@ class SessionCatalog(
builder = builder.expireAfterWrite(cacheTTL, TimeUnit.SECONDS)
}
- builder.build[FullQualifiedTableName, LogicalPlan]()
+ builder.build[QualifiedTableName, LogicalPlan]()
}
/** This method provides a way to get a cached plan. */
- def getCachedPlan(t: FullQualifiedTableName, c: Callable[LogicalPlan]):
LogicalPlan = {
+ def getCachedPlan(t: QualifiedTableName, c: Callable[LogicalPlan]):
LogicalPlan = {
tableRelationCache.get(t, c)
}
/** This method provides a way to get a cached plan if the key exists. */
- def getCachedTable(key: FullQualifiedTableName): LogicalPlan = {
+ def getCachedTable(key: QualifiedTableName): LogicalPlan = {
tableRelationCache.getIfPresent(key)
}
/** This method provides a way to cache a plan. */
- def cacheTable(t: FullQualifiedTableName, l: LogicalPlan): Unit = {
+ def cacheTable(t: QualifiedTableName, l: LogicalPlan): Unit = {
tableRelationCache.put(t, l)
}
/** This method provides a way to invalidate a cached plan. */
- def invalidateCachedTable(key: FullQualifiedTableName): Unit = {
+ def invalidateCachedTable(key: QualifiedTableName): Unit = {
tableRelationCache.invalidate(key)
}
/** This method discards any cached table relation plans for the given table
identifier. */
def invalidateCachedTable(name: TableIdentifier): Unit = {
val qualified = qualifyIdentifier(name)
- invalidateCachedTable(FullQualifiedTableName(
+ invalidateCachedTable(QualifiedTableName(
qualified.catalog.get, qualified.database.get, qualified.table))
}
@@ -301,7 +301,7 @@ class SessionCatalog(
}
if (cascade && databaseExists(dbName)) {
listTables(dbName).foreach { t =>
- invalidateCachedTable(FullQualifiedTableName(SESSION_CATALOG_NAME,
dbName, t.table))
+ invalidateCachedTable(QualifiedTableName(SESSION_CATALOG_NAME, dbName,
t.table))
}
}
externalCatalog.dropDatabase(dbName, ignoreIfNotExists, cascade)
@@ -1183,7 +1183,7 @@ class SessionCatalog(
def refreshTable(name: TableIdentifier): Unit = synchronized {
getLocalOrGlobalTempView(name).map(_.refresh()).getOrElse {
val qualifiedIdent = qualifyIdentifier(name)
- val qualifiedTableName = FullQualifiedTableName(
+ val qualifiedTableName = QualifiedTableName(
qualifiedIdent.catalog.get, qualifiedIdent.database.get,
qualifiedIdent.table)
tableRelationCache.invalidate(qualifiedTableName)
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
index cc881539002b..ceced9313940 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst
+import org.apache.spark.sql.connector.catalog.CatalogManager
+
/**
* An identifier that optionally specifies a database.
*
@@ -107,14 +109,25 @@ case class TableIdentifier(table: String, database:
Option[String], catalog: Opt
}
/** A fully qualified identifier for a table (i.e., database.tableName) */
-case class QualifiedTableName(database: String, name: String) {
- override def toString: String = s"$database.$name"
-}
+case class QualifiedTableName(catalog: String, database: String, name: String)
{
+ /** Two argument ctor for backward compatibility. */
+ def this(database: String, name: String) = this(
+ catalog = CatalogManager.SESSION_CATALOG_NAME,
+ database = database,
+ name = name)
-case class FullQualifiedTableName(catalog: String, database: String, name:
String) {
override def toString: String = s"$catalog.$database.$name"
}
+object QualifiedTableName {
+ def apply(catalog: String, database: String, name: String):
QualifiedTableName = {
+ new QualifiedTableName(catalog, database, name)
+ }
+
+ def apply(database: String, name: String): QualifiedTableName =
+ new QualifiedTableName(database = database, name = name)
+}
+
object TableIdentifier {
def apply(tableName: String): TableIdentifier = new
TableIdentifier(tableName)
def apply(table: String, database: Option[String]): TableIdentifier =
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index fbe63f71ae02..cfbc507fb5c7 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -22,7 +22,7 @@ import scala.concurrent.duration._
import org.scalatest.concurrent.Eventually
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{AliasIdentifier, FullQualifiedTableName,
FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.{AliasIdentifier, FunctionIdentifier,
QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
@@ -1883,7 +1883,7 @@ abstract class SessionCatalogSuite extends AnalysisTest
with Eventually {
conf.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, 1L)
withConfAndEmptyCatalog(conf) { catalog =>
- val table = FullQualifiedTableName(
+ val table = QualifiedTableName(
CatalogManager.SESSION_CATALOG_NAME, catalog.getCurrentDatabase,
"test")
// First, make sure the test table is not cached.
@@ -1903,14 +1903,14 @@ abstract class SessionCatalogSuite extends AnalysisTest
with Eventually {
test("SPARK-34197: refreshTable should not invalidate the relation cache for
temporary views") {
withBasicCatalog { catalog =>
createTempView(catalog, "tbl1", Range(1, 10, 1, 10), false)
- val qualifiedName1 = FullQualifiedTableName(SESSION_CATALOG_NAME,
"default", "tbl1")
+ val qualifiedName1 = QualifiedTableName(SESSION_CATALOG_NAME, "default",
"tbl1")
catalog.cacheTable(qualifiedName1, Range(1, 10, 1, 10))
catalog.refreshTable(TableIdentifier("tbl1"))
assert(catalog.getCachedTable(qualifiedName1) != null)
createGlobalTempView(catalog, "tbl2", Range(2, 10, 1, 10), false)
val qualifiedName2 =
- FullQualifiedTableName(SESSION_CATALOG_NAME,
catalog.globalTempDatabase, "tbl2")
+ QualifiedTableName(SESSION_CATALOG_NAME, catalog.globalTempDatabase,
"tbl2")
catalog.cacheTable(qualifiedName2, Range(2, 10, 1, 10))
catalog.refreshTable(TableIdentifier("tbl2",
Some(catalog.globalTempDatabase)))
assert(catalog.getCachedTable(qualifiedName2) != null)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 2be4b236872f..a2707da2d102 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -28,7 +28,7 @@ import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys.PREDICATES
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters,
FullQualifiedTableName, InternalRow, SQLConfHelper}
+import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters,
InternalRow, QualifiedTableName, SQLConfHelper}
import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog._
@@ -249,7 +249,7 @@ class FindDataSourceTable(sparkSession: SparkSession)
extends Rule[LogicalPlan]
private def readDataSourceTable(
table: CatalogTable, extraOptions: CaseInsensitiveStringMap):
LogicalPlan = {
val qualifiedTableName =
- FullQualifiedTableName(table.identifier.catalog.get, table.database,
table.identifier.table)
+ QualifiedTableName(table.identifier.catalog.get, table.database,
table.identifier.table)
val catalog = sparkSession.sessionState.catalog
val dsOptions = DataSourceUtils.generateDatasourceOptions(extraOptions,
table)
catalog.getCachedPlan(qualifiedTableName, () => {
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index bd1df87d15c3..22c13fd98ced 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable
import scala.jdk.CollectionConverters._
import org.apache.spark.SparkUnsupportedOperationException
-import org.apache.spark.sql.catalyst.{FullQualifiedTableName,
FunctionIdentifier, SQLConfHelper, TableIdentifier}
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName,
SQLConfHelper, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException,
NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase,
CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils,
ClusterBySpec, SessionCatalog}
import org.apache.spark.sql.catalyst.util.TypeUtils._
@@ -93,7 +93,7 @@ class V2SessionCatalog(catalog: SessionCatalog)
// table here. To avoid breaking it we do not resolve the table provider
and still return
// `V1Table` if the custom session catalog is present.
if (table.provider.isDefined && !hasCustomSessionCatalog) {
- val qualifiedTableName = FullQualifiedTableName(
+ val qualifiedTableName = QualifiedTableName(
table.identifier.catalog.get, table.database, table.identifier.table)
// Check if the table is in the v1 table cache to skip the v2 table
lookup.
if (catalog.getCachedTable(qualifiedTableName) != null) {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
index 7fa29dd38fd9..74329ac0e0d2 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
@@ -25,7 +25,7 @@ import java.time.LocalDateTime
import scala.collection.mutable
import scala.util.Random
-import org.apache.spark.sql.catalyst.{FullQualifiedTableName, TableIdentifier}
+import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.{CatalogColumnStat,
CatalogStatistics, CatalogTable, HiveTableRelation}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.AttributeMap
@@ -270,7 +270,7 @@ abstract class StatisticsCollectionTestBase extends
QueryTest with SQLTestUtils
def getTableFromCatalogCache(tableName: String): LogicalPlan = {
val catalog = spark.sessionState.catalog
- val qualifiedTableName = FullQualifiedTableName(
+ val qualifiedTableName = QualifiedTableName(
CatalogManager.SESSION_CATALOG_NAME, catalog.getCurrentDatabase,
tableName)
catalog.getCachedTable(qualifiedTableName)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 7aaec6d500ba..dac066bbef83 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -27,7 +27,7 @@ import scala.jdk.CollectionConverters._
import org.apache.spark.{SparkException, SparkRuntimeException,
SparkUnsupportedOperationException}
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.{FullQualifiedTableName, InternalRow,
TableIdentifier}
+import org.apache.spark.sql.catalyst.{InternalRow, QualifiedTableName,
TableIdentifier}
import org.apache.spark.sql.catalyst.CurrentUserContext.CURRENT_USER
import
org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException,
NoSuchNamespaceException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat,
CatalogTable, CatalogTableType, CatalogUtils}
@@ -3713,7 +3713,7 @@ class DataSourceV2SQLSuiteV1Filter
// Reset CatalogManager to clear the materialized `spark_catalog`
instance, so that we can
// configure a new implementation.
- val table1 = FullQualifiedTableName(SESSION_CATALOG_NAME, "default", "t")
+ val table1 = QualifiedTableName(SESSION_CATALOG_NAME, "default", "t")
spark.sessionState.catalogManager.reset()
withSQLConf(
V2_SESSION_CATALOG_IMPLEMENTATION.key ->
@@ -3722,7 +3722,7 @@ class DataSourceV2SQLSuiteV1Filter
checkParquet(table1.toString, path.getAbsolutePath)
}
}
- val table2 = FullQualifiedTableName("testcat3", "default", "t")
+ val table2 = QualifiedTableName("testcat3", "default", "t")
withSQLConf(
"spark.sql.catalog.testcat3" ->
classOf[V2CatalogSupportBuiltinDataSource].getName) {
withTempPath { path =>
@@ -3741,7 +3741,7 @@ class DataSourceV2SQLSuiteV1Filter
// Reset CatalogManager to clear the materialized `spark_catalog`
instance, so that we can
// configure a new implementation.
spark.sessionState.catalogManager.reset()
- val table1 = FullQualifiedTableName(SESSION_CATALOG_NAME, "default", "t")
+ val table1 = QualifiedTableName(SESSION_CATALOG_NAME, "default", "t")
withSQLConf(
V2_SESSION_CATALOG_IMPLEMENTATION.key ->
classOf[V2CatalogSupportBuiltinDataSource].getName) {
@@ -3750,7 +3750,7 @@ class DataSourceV2SQLSuiteV1Filter
}
}
- val table2 = FullQualifiedTableName("testcat3", "default", "t")
+ val table2 = QualifiedTableName("testcat3", "default", "t")
withSQLConf(
"spark.sql.catalog.testcat3" ->
classOf[V2CatalogSupportBuiltinDataSource].getName) {
withTempPath { path =>
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 8307326f17fc..e07f6406901e 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.permission.{AclEntry, AclStatus}
import org.apache.spark.{SparkClassNotFoundException, SparkException,
SparkFiles, SparkRuntimeException}
import org.apache.spark.internal.config
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
-import org.apache.spark.sql.catalyst.{FullQualifiedTableName,
FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName,
TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -219,7 +219,7 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with
SharedSparkSession {
test("SPARK-25403 refresh the table after inserting data") {
withTable("t") {
val catalog = spark.sessionState.catalog
- val table = FullQualifiedTableName(
+ val table = QualifiedTableName(
CatalogManager.SESSION_CATALOG_NAME, catalog.getCurrentDatabase, "t")
sql("CREATE TABLE t (a INT) USING parquet")
sql("INSERT INTO TABLE t VALUES (1)")
@@ -233,7 +233,7 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with
SharedSparkSession {
withTable("t") {
withTempDir { dir =>
val catalog = spark.sessionState.catalog
- val table = FullQualifiedTableName(
+ val table = QualifiedTableName(
CatalogManager.SESSION_CATALOG_NAME, catalog.getCurrentDatabase, "t")
val p1 = s"${dir.getCanonicalPath}/p1"
val p2 = s"${dir.getCanonicalPath}/p2"
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/TruncateTableSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/TruncateTableSuite.scala
index 348b216aeb04..40ae35bbe8aa 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/TruncateTableSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/TruncateTableSuite.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.permission.{AclEntry, AclEntryScope, AclEntryType,
FsAction, FsPermission}
import org.apache.spark.sql.{AnalysisException, Row}
-import org.apache.spark.sql.catalyst.{FullQualifiedTableName, TableIdentifier}
+import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.execution.command
import org.apache.spark.sql.execution.command.FakeLocalFsFileSystem
@@ -148,7 +148,7 @@ trait TruncateTableSuiteBase extends
command.TruncateTableSuiteBase {
val catalog = spark.sessionState.catalog
val qualifiedTableName =
- FullQualifiedTableName(CatalogManager.SESSION_CATALOG_NAME, "ns",
"tbl")
+ QualifiedTableName(CatalogManager.SESSION_CATALOG_NAME, "ns",
"tbl")
val cachedPlan = catalog.getCachedTable(qualifiedTableName)
assert(cachedPlan.stats.sizeInBytes == 0)
}
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 7873c36222da..1f87db31ffa5 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -28,7 +28,7 @@ import org.apache.spark.SparkException
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.sql.{AnalysisException, SparkSession}
-import org.apache.spark.sql.catalyst.{FullQualifiedTableName, TableIdentifier}
+import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.types.DataTypeUtils
@@ -56,7 +56,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession:
SparkSession) extends Log
private val tableCreationLocks = Striped.lazyWeakLock(100)
/** Acquires a lock on the table cache for the duration of `f`. */
- private def withTableCreationLock[A](tableName: FullQualifiedTableName, f:
=> A): A = {
+ private def withTableCreationLock[A](tableName: QualifiedTableName, f: =>
A): A = {
val lock = tableCreationLocks.get(tableName)
lock.lock()
try f finally {
@@ -66,7 +66,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession:
SparkSession) extends Log
// For testing only
private[hive] def getCachedDataSourceTable(table: TableIdentifier):
LogicalPlan = {
- val key = FullQualifiedTableName(
+ val key = QualifiedTableName(
// scalastyle:off caselocale
table.catalog.getOrElse(CatalogManager.SESSION_CATALOG_NAME).toLowerCase,
table.database.getOrElse(sessionState.catalog.getCurrentDatabase).toLowerCase,
@@ -76,7 +76,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession:
SparkSession) extends Log
}
private def getCached(
- tableIdentifier: FullQualifiedTableName,
+ tableIdentifier: QualifiedTableName,
pathsInMetastore: Seq[Path],
schemaInMetastore: StructType,
expectedFileFormat: Class[_ <: FileFormat],
@@ -120,7 +120,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession:
SparkSession) extends Log
}
private def logWarningUnexpectedFileFormat(
- tableIdentifier: FullQualifiedTableName,
+ tableIdentifier: QualifiedTableName,
expectedFileFormat: Class[_ <: FileFormat],
actualFileFormat: String): Unit = {
logWarning(log"Table ${MDC(TABLE_NAME, tableIdentifier)} should be stored
as " +
@@ -201,7 +201,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession:
SparkSession) extends Log
fileType: String,
isWrite: Boolean): LogicalRelation = {
val metastoreSchema = relation.tableMeta.schema
- val tableIdentifier =
FullQualifiedTableName(relation.tableMeta.identifier.catalog.get,
+ val tableIdentifier =
QualifiedTableName(relation.tableMeta.identifier.catalog.get,
relation.tableMeta.database, relation.tableMeta.identifier.table)
val lazyPruningEnabled =
sparkSession.sessionState.conf.manageFilesourcePartitions
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]