This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new da8221738 [KYUUBI #5023] [KSHC] TableIdentify don't attach catalog
da8221738 is described below

commit da822173883a103dd38cd5583111f213433c9855
Author: yikaifei <[email protected]>
AuthorDate: Thu Jul 6 18:26:37 2023 +0800

    [KYUUBI #5023] [KSHC] TableIdentify don't attach catalog
    
    ### _Why are the changes needed?_
    
    As title, In KSHC, HiveTable's identify does not attach the catalog to 
prevent an incorrect catalogName. default catalog is "spark_catalog"
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    Closes #5023 from Yikf/tableName2.
    
    Closes #5023
    
    86b6a58d0 [yikaifei] KSHC v1IdentifierNoCatalog in spark3.4
    
    Authored-by: yikaifei <[email protected]>
    Signed-off-by: ulyssesyou <[email protected]>
---
 .../spark/connector/hive/HiveConnectorUtils.scala  |  26 ++
 .../spark/connector/hive/HiveTableCatalog.scala    | 357 +++++++++++----------
 .../spark/connector/hive/HiveCatalogSuite.scala    |   4 +-
 3 files changed, 217 insertions(+), 170 deletions(-)

diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
index 1d2d2b319..d0d0666bb 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
@@ -26,6 +26,7 @@ import 
org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, Col
 import org.apache.spark.sql.execution.command.CommandUtils
 import 
org.apache.spark.sql.execution.command.CommandUtils.{calculateMultipleLocationSizes,
 calculateSingleLocationSize}
 import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
 
 import org.apache.kyuubi.spark.connector.common.SparkUtils
@@ -253,4 +254,29 @@ object HiveConnectorUtils extends Logging {
 
     new StructType(newFields)
   }
+
+  def withSQLConf[T](pairs: (String, String)*)(f: => T): T = {
+    val conf = SQLConf.get
+    val (keys, values) = pairs.unzip
+    val currentValues = keys.map { key =>
+      if (conf.contains(key)) {
+        Some(conf.getConfString(key))
+      } else {
+        None
+      }
+    }
+    (keys, values).zipped.foreach { (k, v) =>
+      if (SQLConf.isStaticConfigKey(k)) {
+        throw KyuubiHiveConnectorException(s"Cannot modify the value of a 
static config: $k")
+      }
+      conf.setConfString(k, v)
+    }
+    try f
+    finally {
+      keys.zip(currentValues).foreach {
+        case (key, Some(value)) => conf.setConfString(key, value)
+        case (key, None) => conf.unsetConf(key)
+      }
+    }
+  }
 }
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
index cfc78940b..2d816c8c4 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
@@ -43,6 +43,7 @@ import 
org.apache.spark.sql.internal.StaticSQLConf.{CATALOG_IMPLEMENTATION, GLOB
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
+import org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils.withSQLConf
 import 
org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.{toCatalogDatabase, 
CatalogDatabaseHelper, IdentifierHelper, NamespaceHelper}
 import 
org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorDelegationTokenProvider.metastoreTokenSignature
 
@@ -56,6 +57,8 @@ class HiveTableCatalog(sparkSession: SparkSession)
 
   private val externalCatalogManager = 
ExternalCatalogManager.getOrCreate(sparkSession)
 
+  private val LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME = 
"spark.sql.legacy.v1IdentifierNoCatalog"
+
   private val sc = sparkSession.sparkContext
 
   private val sessionState = sparkSession.sessionState
@@ -143,129 +146,136 @@ class HiveTableCatalog(sparkSession: SparkSession)
 
   override val defaultNamespace: Array[String] = Array("default")
 
-  override def listTables(namespace: Array[String]): Array[Identifier] = {
-    namespace match {
-      case Array(db) =>
-        catalog
-          .listTables(db)
-          .map(ident => 
Identifier.of(ident.database.map(Array(_)).getOrElse(Array()), ident.table))
-          .toArray
-      case _ =>
-        throw new NoSuchNamespaceException(namespace)
+  override def listTables(namespace: Array[String]): Array[Identifier] =
+    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+      namespace match {
+        case Array(db) =>
+          catalog
+            .listTables(db)
+            .map(ident =>
+              Identifier.of(ident.database.map(Array(_)).getOrElse(Array()), 
ident.table))
+            .toArray
+        case _ =>
+          throw new NoSuchNamespaceException(namespace)
+      }
     }
-  }
 
-  override def loadTable(ident: Identifier): Table = {
-    HiveTable(sparkSession, catalog.getTableMetadata(ident.asTableIdentifier), 
this)
-  }
+  override def loadTable(ident: Identifier): Table =
+    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+      HiveTable(sparkSession, 
catalog.getTableMetadata(ident.asTableIdentifier), this)
+    }
 
   override def createTable(
       ident: Identifier,
       schema: StructType,
       partitions: Array[Transform],
-      properties: util.Map[String, String]): Table = {
-    import 
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.TransformHelper
-    val (partitionColumns, maybeBucketSpec) = 
partitions.toSeq.convertTransforms
-    val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, 
conf.defaultDataSourceName)
-    val tableProperties = properties.asScala
-    val location = Option(properties.get(TableCatalog.PROP_LOCATION))
-    val storage = 
DataSource.buildStorageFormatFromOptions(toOptions(tableProperties.toMap))
-      .copy(locationUri = location.map(CatalogUtils.stringToURI))
-    val isExternal = properties.containsKey(TableCatalog.PROP_EXTERNAL)
-    val tableType =
-      if (isExternal || location.isDefined) {
-        CatalogTableType.EXTERNAL
-      } else {
-        CatalogTableType.MANAGED
+      properties: util.Map[String, String]): Table =
+    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+      import 
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.TransformHelper
+      val (partitionColumns, maybeBucketSpec) = 
partitions.toSeq.convertTransforms
+      val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, 
conf.defaultDataSourceName)
+      val tableProperties = properties.asScala
+      val location = Option(properties.get(TableCatalog.PROP_LOCATION))
+      val storage = 
DataSource.buildStorageFormatFromOptions(toOptions(tableProperties.toMap))
+        .copy(locationUri = location.map(CatalogUtils.stringToURI))
+      val isExternal = properties.containsKey(TableCatalog.PROP_EXTERNAL)
+      val tableType =
+        if (isExternal || location.isDefined) {
+          CatalogTableType.EXTERNAL
+        } else {
+          CatalogTableType.MANAGED
+        }
+
+      val tableDesc = CatalogTable(
+        identifier = ident.asTableIdentifier,
+        tableType = tableType,
+        storage = storage,
+        schema = schema,
+        provider = Some(provider),
+        partitionColumnNames = partitionColumns,
+        bucketSpec = maybeBucketSpec,
+        properties = tableProperties.toMap,
+        tracksPartitionsInCatalog = conf.manageFilesourcePartitions,
+        comment = Option(properties.get(TableCatalog.PROP_COMMENT)))
+
+      try {
+        catalog.createTable(tableDesc, ignoreIfExists = false)
+      } catch {
+        case _: TableAlreadyExistsException =>
+          throw new TableAlreadyExistsException(ident)
       }
 
-    val tableDesc = CatalogTable(
-      identifier = ident.asTableIdentifier,
-      tableType = tableType,
-      storage = storage,
-      schema = schema,
-      provider = Some(provider),
-      partitionColumnNames = partitionColumns,
-      bucketSpec = maybeBucketSpec,
-      properties = tableProperties.toMap,
-      tracksPartitionsInCatalog = conf.manageFilesourcePartitions,
-      comment = Option(properties.get(TableCatalog.PROP_COMMENT)))
-
-    try {
-      catalog.createTable(tableDesc, ignoreIfExists = false)
-    } catch {
-      case _: TableAlreadyExistsException =>
-        throw new TableAlreadyExistsException(ident)
+      loadTable(ident)
     }
 
-    loadTable(ident)
-  }
+  override def alterTable(ident: Identifier, changes: TableChange*): Table =
+    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+      val catalogTable =
+        try {
+          catalog.getTableMetadata(ident.asTableIdentifier)
+        } catch {
+          case _: NoSuchTableException =>
+            throw new NoSuchTableException(ident)
+        }
+
+      val properties = 
CatalogV2Util.applyPropertiesChanges(catalogTable.properties, changes)
+      val schema = HiveConnectorUtils.applySchemaChanges(
+        catalogTable.schema,
+        changes)
+      val comment = properties.get(TableCatalog.PROP_COMMENT)
+      val owner = properties.getOrElse(TableCatalog.PROP_OWNER, 
catalogTable.owner)
+      val location = 
properties.get(TableCatalog.PROP_LOCATION).map(CatalogUtils.stringToURI)
+      val storage =
+        if (location.isDefined) {
+          catalogTable.storage.copy(locationUri = location)
+        } else {
+          catalogTable.storage
+        }
 
-  override def alterTable(ident: Identifier, changes: TableChange*): Table = {
-    val catalogTable =
       try {
-        catalog.getTableMetadata(ident.asTableIdentifier)
+        catalog.alterTable(
+          catalogTable.copy(
+            properties = properties,
+            schema = schema,
+            owner = owner,
+            comment = comment,
+            storage = storage))
       } catch {
         case _: NoSuchTableException =>
           throw new NoSuchTableException(ident)
       }
 
-    val properties = 
CatalogV2Util.applyPropertiesChanges(catalogTable.properties, changes)
-    val schema = HiveConnectorUtils.applySchemaChanges(
-      catalogTable.schema,
-      changes)
-    val comment = properties.get(TableCatalog.PROP_COMMENT)
-    val owner = properties.getOrElse(TableCatalog.PROP_OWNER, 
catalogTable.owner)
-    val location = 
properties.get(TableCatalog.PROP_LOCATION).map(CatalogUtils.stringToURI)
-    val storage =
-      if (location.isDefined) {
-        catalogTable.storage.copy(locationUri = location)
-      } else {
-        catalogTable.storage
-      }
-
-    try {
-      catalog.alterTable(
-        catalogTable.copy(
-          properties = properties,
-          schema = schema,
-          owner = owner,
-          comment = comment,
-          storage = storage))
-    } catch {
-      case _: NoSuchTableException =>
-        throw new NoSuchTableException(ident)
+      loadTable(ident)
     }
 
-    loadTable(ident)
-  }
-
-  override def dropTable(ident: Identifier): Boolean = {
-    try {
-      if (loadTable(ident) != null) {
-        catalog.dropTable(
-          ident.asTableIdentifier,
-          ignoreIfNotExists = true,
-          purge = true /* skip HDFS trash */ )
-        true
-      } else {
-        false
+  override def dropTable(ident: Identifier): Boolean =
+    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+      try {
+        if (loadTable(ident) != null) {
+          catalog.dropTable(
+            ident.asTableIdentifier,
+            ignoreIfNotExists = true,
+            purge = true /* skip HDFS trash */ )
+          true
+        } else {
+          false
+        }
+      } catch {
+        case _: NoSuchTableException =>
+          false
       }
-    } catch {
-      case _: NoSuchTableException =>
-        false
     }
-  }
 
-  override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = 
{
-    if (tableExists(newIdent)) {
-      throw new TableAlreadyExistsException(newIdent)
-    }
+  override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit =
+    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+      if (tableExists(newIdent)) {
+        throw new TableAlreadyExistsException(newIdent)
+      }
 
-    // Load table to make sure the table exists
-    loadTable(oldIdent)
-    catalog.renameTable(oldIdent.asTableIdentifier, newIdent.asTableIdentifier)
-  }
+      // Load table to make sure the table exists
+      loadTable(oldIdent)
+      catalog.renameTable(oldIdent.asTableIdentifier, 
newIdent.asTableIdentifier)
+    }
 
   private def toOptions(properties: Map[String, String]): Map[String, String] 
= {
     properties.filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX)).map {
@@ -273,70 +283,78 @@ class HiveTableCatalog(sparkSession: SparkSession)
     }
   }
 
-  override def listNamespaces(): Array[Array[String]] = {
-    catalog.listDatabases().map(Array(_)).toArray
-  }
-
-  override def listNamespaces(namespace: Array[String]): Array[Array[String]] 
= {
-    namespace match {
-      case Array() =>
-        listNamespaces()
-      case Array(db) if catalog.databaseExists(db) =>
-        Array()
-      case _ =>
-        throw new NoSuchNamespaceException(namespace)
+  override def listNamespaces(): Array[Array[String]] =
+    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+      catalog.listDatabases().map(Array(_)).toArray
     }
-  }
 
-  override def loadNamespaceMetadata(namespace: Array[String]): 
util.Map[String, String] = {
-    namespace match {
-      case Array(db) =>
-        try {
-          catalog.getDatabaseMetadata(db).toMetadata
-        } catch {
-          case _: NoSuchDatabaseException =>
-            throw new NoSuchNamespaceException(namespace)
-        }
+  override def listNamespaces(namespace: Array[String]): Array[Array[String]] =
+    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+      namespace match {
+        case Array() =>
+          listNamespaces()
+        case Array(db) if catalog.databaseExists(db) =>
+          Array()
+        case _ =>
+          throw new NoSuchNamespaceException(namespace)
+      }
+    }
 
-      case _ =>
-        throw new NoSuchNamespaceException(namespace)
+  override def loadNamespaceMetadata(namespace: Array[String]): 
util.Map[String, String] =
+    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+      namespace match {
+        case Array(db) =>
+          try {
+            catalog.getDatabaseMetadata(db).toMetadata
+          } catch {
+            case _: NoSuchDatabaseException =>
+              throw new NoSuchNamespaceException(namespace)
+          }
+
+        case _ =>
+          throw new NoSuchNamespaceException(namespace)
+      }
     }
-  }
 
   override def createNamespace(
       namespace: Array[String],
-      metadata: util.Map[String, String]): Unit = namespace match {
-    case Array(db) if !catalog.databaseExists(db) =>
-      catalog.createDatabase(
-        toCatalogDatabase(db, metadata, defaultLocation = 
Some(catalog.getDefaultDBPath(db))),
-        ignoreIfExists = false)
-
-    case Array(_) =>
-      throw new NamespaceAlreadyExistsException(namespace)
-
-    case _ =>
-      throw new IllegalArgumentException(s"Invalid namespace name: 
${namespace.quoted}")
-  }
-
-  override def alterNamespace(namespace: Array[String], changes: 
NamespaceChange*): Unit = {
-    namespace match {
-      case Array(db) =>
-        // validate that this catalog's reserved properties are not removed
-        changes.foreach {
-          case remove: RemoveProperty if 
NAMESPACE_RESERVED_PROPERTIES.contains(remove.property) =>
-            throw new UnsupportedOperationException(
-              s"Cannot remove reserved property: ${remove.property}")
-          case _ =>
-        }
-
-        val metadata = catalog.getDatabaseMetadata(db).toMetadata
-        catalog.alterDatabase(
-          toCatalogDatabase(db, CatalogV2Util.applyNamespaceChanges(metadata, 
changes)))
+      metadata: util.Map[String, String]): Unit =
+    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+      namespace match {
+        case Array(db) if !catalog.databaseExists(db) =>
+          catalog.createDatabase(
+            toCatalogDatabase(db, metadata, defaultLocation = 
Some(catalog.getDefaultDBPath(db))),
+            ignoreIfExists = false)
+
+        case Array(_) =>
+          throw new NamespaceAlreadyExistsException(namespace)
+
+        case _ =>
+          throw new IllegalArgumentException(s"Invalid namespace name: 
${namespace.quoted}")
+      }
+    }
 
-      case _ =>
-        throw new NoSuchNamespaceException(namespace)
+  override def alterNamespace(namespace: Array[String], changes: 
NamespaceChange*): Unit =
+    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+      namespace match {
+        case Array(db) =>
+          // validate that this catalog's reserved properties are not removed
+          changes.foreach {
+            case remove: RemoveProperty
+                if NAMESPACE_RESERVED_PROPERTIES.contains(remove.property) =>
+              throw new UnsupportedOperationException(
+                s"Cannot remove reserved property: ${remove.property}")
+            case _ =>
+          }
+
+          val metadata = catalog.getDatabaseMetadata(db).toMetadata
+          catalog.alterDatabase(
+            toCatalogDatabase(db, 
CatalogV2Util.applyNamespaceChanges(metadata, changes)))
+
+        case _ =>
+          throw new NoSuchNamespaceException(namespace)
+      }
     }
-  }
 
   /**
    * List the metadata of partitions that belong to the specified table, 
assuming it exists, that
@@ -356,21 +374,24 @@ class HiveTableCatalog(sparkSession: SparkSession)
 
   override def dropNamespace(
       namespace: Array[String],
-      cascade: Boolean): Boolean = namespace match {
-    case Array(db) if catalog.databaseExists(db) =>
-      if (catalog.listTables(db).nonEmpty && !cascade) {
-        throw new IllegalStateException(s"Namespace ${namespace.quoted} is not 
empty")
+      cascade: Boolean): Boolean =
+    withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+      namespace match {
+        case Array(db) if catalog.databaseExists(db) =>
+          if (catalog.listTables(db).nonEmpty && !cascade) {
+            throw new IllegalStateException(s"Namespace ${namespace.quoted} is 
not empty")
+          }
+          catalog.dropDatabase(db, ignoreIfNotExists = false, cascade)
+          true
+
+        case Array(_) =>
+          // exists returned false
+          false
+
+        case _ =>
+          throw new NoSuchNamespaceException(namespace)
       }
-      catalog.dropDatabase(db, ignoreIfNotExists = false, cascade)
-      true
-
-    case Array(_) =>
-      // exists returned false
-      false
-
-    case _ =>
-      throw new NoSuchNamespaceException(namespace)
-  }
+    }
 }
 
 private object HiveTableCatalog {
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
index c1575018e..5e0be5ce6 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
@@ -164,7 +164,7 @@ class HiveCatalogSuite extends KyuubiHiveTest {
       catalog.createTable(testIdent, schema, Array.empty[Transform], 
emptyProps)
 
     val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
-    assert(parsed == Seq("db", "test_table") || parsed == Seq("spark_catalog", 
"db", "test_table"))
+    assert(parsed == Seq("db", "test_table"))
     assert(table.schema == schema)
     assert(filterV2TableProperties(table.properties) == Map())
 
@@ -181,7 +181,7 @@ class HiveCatalogSuite extends KyuubiHiveTest {
     val table = catalog.createTable(testIdent, schema, Array.empty[Transform], 
properties)
 
     val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
-    assert(parsed == Seq("db", "test_table") || parsed == Seq("spark_catalog", 
"db", "test_table"))
+    assert(parsed == Seq("db", "test_table"))
     assert(table.schema == schema)
     assert(filterV2TableProperties(table.properties).asJava == properties)
 

Reply via email to