This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fc9d421a2345 [SPARK-49211][SQL][FOLLOW-UP] Support catalog in 
QualifiedTableName
fc9d421a2345 is described below

commit fc9d421a2345987105aa97947c867ac80ba48a05
Author: Rui Wang <[email protected]>
AuthorDate: Fri Sep 27 08:26:24 2024 +0800

    [SPARK-49211][SQL][FOLLOW-UP] Support catalog in QualifiedTableName
    
    ### What changes were proposed in this pull request?
    
    Support catalog in QualifiedTableName and remove `FullQualifiedTableName`.
    
    ### Why are the changes needed?
    
    Consolidate and remove duplicate code.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing UT
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #48255 from amaliujia/qualifedtablename.
    
    Authored-by: Rui Wang <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/catalyst/catalog/SessionCatalog.scala | 18 +++++++++---------
 .../org/apache/spark/sql/catalyst/identifiers.scala | 21 +++++++++++++++++----
 .../sql/catalyst/catalog/SessionCatalogSuite.scala  |  8 ++++----
 .../execution/datasources/DataSourceStrategy.scala  |  4 ++--
 .../execution/datasources/v2/V2SessionCatalog.scala |  4 ++--
 .../spark/sql/StatisticsCollectionTestBase.scala    |  4 ++--
 .../spark/sql/connector/DataSourceV2SQLSuite.scala  | 10 +++++-----
 .../spark/sql/execution/command/DDLSuite.scala      |  6 +++---
 .../execution/command/v1/TruncateTableSuite.scala   |  4 ++--
 .../spark/sql/hive/HiveMetastoreCatalog.scala       | 12 ++++++------
 10 files changed, 52 insertions(+), 39 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index d3a6cb6ae284..a0f7af10fefa 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -197,7 +197,7 @@ class SessionCatalog(
     }
   }
 
-  private val tableRelationCache: Cache[FullQualifiedTableName, LogicalPlan] = 
{
+  private val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
     var builder = CacheBuilder.newBuilder()
       .maximumSize(cacheSize)
 
@@ -205,33 +205,33 @@ class SessionCatalog(
       builder = builder.expireAfterWrite(cacheTTL, TimeUnit.SECONDS)
     }
 
-    builder.build[FullQualifiedTableName, LogicalPlan]()
+    builder.build[QualifiedTableName, LogicalPlan]()
   }
 
   /** This method provides a way to get a cached plan. */
-  def getCachedPlan(t: FullQualifiedTableName, c: Callable[LogicalPlan]): 
LogicalPlan = {
+  def getCachedPlan(t: QualifiedTableName, c: Callable[LogicalPlan]): 
LogicalPlan = {
     tableRelationCache.get(t, c)
   }
 
   /** This method provides a way to get a cached plan if the key exists. */
-  def getCachedTable(key: FullQualifiedTableName): LogicalPlan = {
+  def getCachedTable(key: QualifiedTableName): LogicalPlan = {
     tableRelationCache.getIfPresent(key)
   }
 
   /** This method provides a way to cache a plan. */
-  def cacheTable(t: FullQualifiedTableName, l: LogicalPlan): Unit = {
+  def cacheTable(t: QualifiedTableName, l: LogicalPlan): Unit = {
     tableRelationCache.put(t, l)
   }
 
   /** This method provides a way to invalidate a cached plan. */
-  def invalidateCachedTable(key: FullQualifiedTableName): Unit = {
+  def invalidateCachedTable(key: QualifiedTableName): Unit = {
     tableRelationCache.invalidate(key)
   }
 
   /** This method discards any cached table relation plans for the given table 
identifier. */
   def invalidateCachedTable(name: TableIdentifier): Unit = {
     val qualified = qualifyIdentifier(name)
-    invalidateCachedTable(FullQualifiedTableName(
+    invalidateCachedTable(QualifiedTableName(
       qualified.catalog.get, qualified.database.get, qualified.table))
   }
 
@@ -301,7 +301,7 @@ class SessionCatalog(
     }
     if (cascade && databaseExists(dbName)) {
       listTables(dbName).foreach { t =>
-        invalidateCachedTable(FullQualifiedTableName(SESSION_CATALOG_NAME, 
dbName, t.table))
+        invalidateCachedTable(QualifiedTableName(SESSION_CATALOG_NAME, dbName, 
t.table))
       }
     }
     externalCatalog.dropDatabase(dbName, ignoreIfNotExists, cascade)
@@ -1183,7 +1183,7 @@ class SessionCatalog(
   def refreshTable(name: TableIdentifier): Unit = synchronized {
     getLocalOrGlobalTempView(name).map(_.refresh()).getOrElse {
       val qualifiedIdent = qualifyIdentifier(name)
-      val qualifiedTableName = FullQualifiedTableName(
+      val qualifiedTableName = QualifiedTableName(
         qualifiedIdent.catalog.get, qualifiedIdent.database.get, 
qualifiedIdent.table)
       tableRelationCache.invalidate(qualifiedTableName)
     }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
index cc881539002b..ceced9313940 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.catalyst
 
+import org.apache.spark.sql.connector.catalog.CatalogManager
+
 /**
  * An identifier that optionally specifies a database.
  *
@@ -107,14 +109,25 @@ case class TableIdentifier(table: String, database: 
Option[String], catalog: Opt
 }
 
 /** A fully qualified identifier for a table (i.e., database.tableName) */
-case class QualifiedTableName(database: String, name: String) {
-  override def toString: String = s"$database.$name"
-}
+case class QualifiedTableName(catalog: String, database: String, name: String) 
{
+  /** Two argument ctor for backward compatibility. */
+  def this(database: String, name: String) = this(
+    catalog = CatalogManager.SESSION_CATALOG_NAME,
+    database = database,
+    name = name)
 
-case class FullQualifiedTableName(catalog: String, database: String, name: 
String) {
   override def toString: String = s"$catalog.$database.$name"
 }
 
+object QualifiedTableName {
+  def apply(catalog: String, database: String, name: String): 
QualifiedTableName = {
+    new QualifiedTableName(catalog, database, name)
+  }
+
+  def apply(database: String, name: String): QualifiedTableName =
+    new QualifiedTableName(database = database, name = name)
+}
+
 object TableIdentifier {
   def apply(tableName: String): TableIdentifier = new 
TableIdentifier(tableName)
   def apply(table: String, database: Option[String]): TableIdentifier =
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index fbe63f71ae02..cfbc507fb5c7 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -22,7 +22,7 @@ import scala.concurrent.duration._
 import org.scalatest.concurrent.Eventually
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{AliasIdentifier, FullQualifiedTableName, 
FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.{AliasIdentifier, FunctionIdentifier, 
QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
@@ -1883,7 +1883,7 @@ abstract class SessionCatalogSuite extends AnalysisTest 
with Eventually {
     conf.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, 1L)
 
     withConfAndEmptyCatalog(conf) { catalog =>
-      val table = FullQualifiedTableName(
+      val table = QualifiedTableName(
         CatalogManager.SESSION_CATALOG_NAME, catalog.getCurrentDatabase, 
"test")
 
       // First, make sure the test table is not cached.
@@ -1903,14 +1903,14 @@ abstract class SessionCatalogSuite extends AnalysisTest 
with Eventually {
   test("SPARK-34197: refreshTable should not invalidate the relation cache for 
temporary views") {
     withBasicCatalog { catalog =>
       createTempView(catalog, "tbl1", Range(1, 10, 1, 10), false)
-      val qualifiedName1 = FullQualifiedTableName(SESSION_CATALOG_NAME, 
"default", "tbl1")
+      val qualifiedName1 = QualifiedTableName(SESSION_CATALOG_NAME, "default", 
"tbl1")
       catalog.cacheTable(qualifiedName1, Range(1, 10, 1, 10))
       catalog.refreshTable(TableIdentifier("tbl1"))
       assert(catalog.getCachedTable(qualifiedName1) != null)
 
       createGlobalTempView(catalog, "tbl2", Range(2, 10, 1, 10), false)
       val qualifiedName2 =
-        FullQualifiedTableName(SESSION_CATALOG_NAME, 
catalog.globalTempDatabase, "tbl2")
+        QualifiedTableName(SESSION_CATALOG_NAME, catalog.globalTempDatabase, 
"tbl2")
       catalog.cacheTable(qualifiedName2, Range(2, 10, 1, 10))
       catalog.refreshTable(TableIdentifier("tbl2", 
Some(catalog.globalTempDatabase)))
       assert(catalog.getCachedTable(qualifiedName2) != null)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 2be4b236872f..a2707da2d102 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -28,7 +28,7 @@ import org.apache.spark.internal.{Logging, MDC}
 import org.apache.spark.internal.LogKeys.PREDICATES
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters, 
FullQualifiedTableName, InternalRow, SQLConfHelper}
+import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters, 
InternalRow, QualifiedTableName, SQLConfHelper}
 import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog._
@@ -249,7 +249,7 @@ class FindDataSourceTable(sparkSession: SparkSession) 
extends Rule[LogicalPlan]
   private def readDataSourceTable(
       table: CatalogTable, extraOptions: CaseInsensitiveStringMap): 
LogicalPlan = {
     val qualifiedTableName =
-      FullQualifiedTableName(table.identifier.catalog.get, table.database, 
table.identifier.table)
+      QualifiedTableName(table.identifier.catalog.get, table.database, 
table.identifier.table)
     val catalog = sparkSession.sessionState.catalog
     val dsOptions = DataSourceUtils.generateDatasourceOptions(extraOptions, 
table)
     catalog.getCachedPlan(qualifiedTableName, () => {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index bd1df87d15c3..22c13fd98ced 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.SparkUnsupportedOperationException
-import org.apache.spark.sql.catalyst.{FullQualifiedTableName, 
FunctionIdentifier, SQLConfHelper, TableIdentifier}
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, 
SQLConfHelper, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, 
NoSuchTableException, TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, 
CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils, 
ClusterBySpec, SessionCatalog}
 import org.apache.spark.sql.catalyst.util.TypeUtils._
@@ -93,7 +93,7 @@ class V2SessionCatalog(catalog: SessionCatalog)
       // table here. To avoid breaking it we do not resolve the table provider 
and still return
       // `V1Table` if the custom session catalog is present.
       if (table.provider.isDefined && !hasCustomSessionCatalog) {
-        val qualifiedTableName = FullQualifiedTableName(
+        val qualifiedTableName = QualifiedTableName(
           table.identifier.catalog.get, table.database, table.identifier.table)
         // Check if the table is in the v1 table cache to skip the v2 table 
lookup.
         if (catalog.getCachedTable(qualifiedTableName) != null) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
index 7fa29dd38fd9..74329ac0e0d2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
@@ -25,7 +25,7 @@ import java.time.LocalDateTime
 import scala.collection.mutable
 import scala.util.Random
 
-import org.apache.spark.sql.catalyst.{FullQualifiedTableName, TableIdentifier}
+import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.{CatalogColumnStat, 
CatalogStatistics, CatalogTable, HiveTableRelation}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.AttributeMap
@@ -270,7 +270,7 @@ abstract class StatisticsCollectionTestBase extends 
QueryTest with SQLTestUtils
 
   def getTableFromCatalogCache(tableName: String): LogicalPlan = {
     val catalog = spark.sessionState.catalog
-    val qualifiedTableName = FullQualifiedTableName(
+    val qualifiedTableName = QualifiedTableName(
       CatalogManager.SESSION_CATALOG_NAME, catalog.getCurrentDatabase, 
tableName)
     catalog.getCachedTable(qualifiedTableName)
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 7aaec6d500ba..dac066bbef83 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -27,7 +27,7 @@ import scala.jdk.CollectionConverters._
 
 import org.apache.spark.{SparkException, SparkRuntimeException, 
SparkUnsupportedOperationException}
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.{FullQualifiedTableName, InternalRow, 
TableIdentifier}
+import org.apache.spark.sql.catalyst.{InternalRow, QualifiedTableName, 
TableIdentifier}
 import org.apache.spark.sql.catalyst.CurrentUserContext.CURRENT_USER
 import 
org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, 
NoSuchNamespaceException, TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType, CatalogUtils}
@@ -3713,7 +3713,7 @@ class DataSourceV2SQLSuiteV1Filter
 
     // Reset CatalogManager to clear the materialized `spark_catalog` 
instance, so that we can
     // configure a new implementation.
-    val table1 = FullQualifiedTableName(SESSION_CATALOG_NAME, "default", "t")
+    val table1 = QualifiedTableName(SESSION_CATALOG_NAME, "default", "t")
     spark.sessionState.catalogManager.reset()
     withSQLConf(
       V2_SESSION_CATALOG_IMPLEMENTATION.key ->
@@ -3722,7 +3722,7 @@ class DataSourceV2SQLSuiteV1Filter
         checkParquet(table1.toString, path.getAbsolutePath)
       }
     }
-    val table2 = FullQualifiedTableName("testcat3", "default", "t")
+    val table2 = QualifiedTableName("testcat3", "default", "t")
     withSQLConf(
       "spark.sql.catalog.testcat3" -> 
classOf[V2CatalogSupportBuiltinDataSource].getName) {
       withTempPath { path =>
@@ -3741,7 +3741,7 @@ class DataSourceV2SQLSuiteV1Filter
     // Reset CatalogManager to clear the materialized `spark_catalog` 
instance, so that we can
     // configure a new implementation.
     spark.sessionState.catalogManager.reset()
-    val table1 = FullQualifiedTableName(SESSION_CATALOG_NAME, "default", "t")
+    val table1 = QualifiedTableName(SESSION_CATALOG_NAME, "default", "t")
     withSQLConf(
       V2_SESSION_CATALOG_IMPLEMENTATION.key ->
         classOf[V2CatalogSupportBuiltinDataSource].getName) {
@@ -3750,7 +3750,7 @@ class DataSourceV2SQLSuiteV1Filter
       }
     }
 
-    val table2 = FullQualifiedTableName("testcat3", "default", "t")
+    val table2 = QualifiedTableName("testcat3", "default", "t")
     withSQLConf(
       "spark.sql.catalog.testcat3" -> 
classOf[V2CatalogSupportBuiltinDataSource].getName) {
       withTempPath { path =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 8307326f17fc..e07f6406901e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.permission.{AclEntry, AclStatus}
 import org.apache.spark.{SparkClassNotFoundException, SparkException, 
SparkFiles, SparkRuntimeException}
 import org.apache.spark.internal.config
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
-import org.apache.spark.sql.catalyst.{FullQualifiedTableName, 
FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, 
TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -219,7 +219,7 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with 
SharedSparkSession {
   test("SPARK-25403 refresh the table after inserting data") {
     withTable("t") {
       val catalog = spark.sessionState.catalog
-      val table = FullQualifiedTableName(
+      val table = QualifiedTableName(
         CatalogManager.SESSION_CATALOG_NAME, catalog.getCurrentDatabase, "t")
       sql("CREATE TABLE t (a INT) USING parquet")
       sql("INSERT INTO TABLE t VALUES (1)")
@@ -233,7 +233,7 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with 
SharedSparkSession {
     withTable("t") {
       withTempDir { dir =>
         val catalog = spark.sessionState.catalog
-        val table = FullQualifiedTableName(
+        val table = QualifiedTableName(
           CatalogManager.SESSION_CATALOG_NAME, catalog.getCurrentDatabase, "t")
         val p1 = s"${dir.getCanonicalPath}/p1"
         val p2 = s"${dir.getCanonicalPath}/p2"
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/TruncateTableSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/TruncateTableSuite.scala
index 348b216aeb04..40ae35bbe8aa 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/TruncateTableSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/TruncateTableSuite.scala
@@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.hadoop.fs.permission.{AclEntry, AclEntryScope, AclEntryType, 
FsAction, FsPermission}
 
 import org.apache.spark.sql.{AnalysisException, Row}
-import org.apache.spark.sql.catalyst.{FullQualifiedTableName, TableIdentifier}
+import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.connector.catalog.CatalogManager
 import org.apache.spark.sql.execution.command
 import org.apache.spark.sql.execution.command.FakeLocalFsFileSystem
@@ -148,7 +148,7 @@ trait TruncateTableSuiteBase extends 
command.TruncateTableSuiteBase {
 
           val catalog = spark.sessionState.catalog
           val qualifiedTableName =
-            FullQualifiedTableName(CatalogManager.SESSION_CATALOG_NAME, "ns", 
"tbl")
+            QualifiedTableName(CatalogManager.SESSION_CATALOG_NAME, "ns", 
"tbl")
           val cachedPlan = catalog.getCachedTable(qualifiedTableName)
           assert(cachedPlan.stats.sizeInBytes == 0)
         }
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 7873c36222da..1f87db31ffa5 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -28,7 +28,7 @@ import org.apache.spark.SparkException
 import org.apache.spark.internal.{Logging, MDC}
 import org.apache.spark.internal.LogKeys._
 import org.apache.spark.sql.{AnalysisException, SparkSession}
-import org.apache.spark.sql.catalyst.{FullQualifiedTableName, TableIdentifier}
+import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
@@ -56,7 +56,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
   private val tableCreationLocks = Striped.lazyWeakLock(100)
 
   /** Acquires a lock on the table cache for the duration of `f`. */
-  private def withTableCreationLock[A](tableName: FullQualifiedTableName, f: 
=> A): A = {
+  private def withTableCreationLock[A](tableName: QualifiedTableName, f: => 
A): A = {
     val lock = tableCreationLocks.get(tableName)
     lock.lock()
     try f finally {
@@ -66,7 +66,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
 
   // For testing only
   private[hive] def getCachedDataSourceTable(table: TableIdentifier): 
LogicalPlan = {
-    val key = FullQualifiedTableName(
+    val key = QualifiedTableName(
       // scalastyle:off caselocale
       table.catalog.getOrElse(CatalogManager.SESSION_CATALOG_NAME).toLowerCase,
       
table.database.getOrElse(sessionState.catalog.getCurrentDatabase).toLowerCase,
@@ -76,7 +76,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
   }
 
   private def getCached(
-      tableIdentifier: FullQualifiedTableName,
+      tableIdentifier: QualifiedTableName,
       pathsInMetastore: Seq[Path],
       schemaInMetastore: StructType,
       expectedFileFormat: Class[_ <: FileFormat],
@@ -120,7 +120,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
   }
 
   private def logWarningUnexpectedFileFormat(
-      tableIdentifier: FullQualifiedTableName,
+      tableIdentifier: QualifiedTableName,
       expectedFileFormat: Class[_ <: FileFormat],
       actualFileFormat: String): Unit = {
     logWarning(log"Table ${MDC(TABLE_NAME, tableIdentifier)} should be stored 
as " +
@@ -201,7 +201,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
       fileType: String,
       isWrite: Boolean): LogicalRelation = {
     val metastoreSchema = relation.tableMeta.schema
-    val tableIdentifier = 
FullQualifiedTableName(relation.tableMeta.identifier.catalog.get,
+    val tableIdentifier = 
QualifiedTableName(relation.tableMeta.identifier.catalog.get,
       relation.tableMeta.database, relation.tableMeta.identifier.table)
 
     val lazyPruningEnabled = 
sparkSession.sessionState.conf.manageFilesourcePartitions


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to