This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new da8221738 [KYUUBI #5023] [KSHC] TableIdentify don't attach catalog
da8221738 is described below
commit da822173883a103dd38cd5583111f213433c9855
Author: yikaifei <[email protected]>
AuthorDate: Thu Jul 6 18:26:37 2023 +0800
[KYUUBI #5023] [KSHC] TableIdentify don't attach catalog
### _Why are the changes needed?_
As title, In KSHC, HiveTable's identify does not attach the catalog to
prevent an incorrect catalogName. default catalog is "spark_catalog"
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
Closes #5023 from Yikf/tableName2.
Closes #5023
86b6a58d0 [yikaifei] KSHC v1IdentifierNoCatalog in spark3.4
Authored-by: yikaifei <[email protected]>
Signed-off-by: ulyssesyou <[email protected]>
---
.../spark/connector/hive/HiveConnectorUtils.scala | 26 ++
.../spark/connector/hive/HiveTableCatalog.scala | 357 +++++++++++----------
.../spark/connector/hive/HiveCatalogSuite.scala | 4 +-
3 files changed, 217 insertions(+), 170 deletions(-)
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
index 1d2d2b319..d0d0666bb 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
@@ -26,6 +26,7 @@ import
org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, Col
import org.apache.spark.sql.execution.command.CommandUtils
import
org.apache.spark.sql.execution.command.CommandUtils.{calculateMultipleLocationSizes,
calculateSingleLocationSize}
import org.apache.spark.sql.execution.datasources.PartitionedFile
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
import org.apache.kyuubi.spark.connector.common.SparkUtils
@@ -253,4 +254,29 @@ object HiveConnectorUtils extends Logging {
new StructType(newFields)
}
+
+ def withSQLConf[T](pairs: (String, String)*)(f: => T): T = {
+ val conf = SQLConf.get
+ val (keys, values) = pairs.unzip
+ val currentValues = keys.map { key =>
+ if (conf.contains(key)) {
+ Some(conf.getConfString(key))
+ } else {
+ None
+ }
+ }
+ (keys, values).zipped.foreach { (k, v) =>
+ if (SQLConf.isStaticConfigKey(k)) {
+ throw KyuubiHiveConnectorException(s"Cannot modify the value of a
static config: $k")
+ }
+ conf.setConfString(k, v)
+ }
+ try f
+ finally {
+ keys.zip(currentValues).foreach {
+ case (key, Some(value)) => conf.setConfString(key, value)
+ case (key, None) => conf.unsetConf(key)
+ }
+ }
+ }
}
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
index cfc78940b..2d816c8c4 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
@@ -43,6 +43,7 @@ import
org.apache.spark.sql.internal.StaticSQLConf.{CATALOG_IMPLEMENTATION, GLOB
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils.withSQLConf
import
org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.{toCatalogDatabase,
CatalogDatabaseHelper, IdentifierHelper, NamespaceHelper}
import
org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorDelegationTokenProvider.metastoreTokenSignature
@@ -56,6 +57,8 @@ class HiveTableCatalog(sparkSession: SparkSession)
private val externalCatalogManager =
ExternalCatalogManager.getOrCreate(sparkSession)
+ private val LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME =
"spark.sql.legacy.v1IdentifierNoCatalog"
+
private val sc = sparkSession.sparkContext
private val sessionState = sparkSession.sessionState
@@ -143,129 +146,136 @@ class HiveTableCatalog(sparkSession: SparkSession)
override val defaultNamespace: Array[String] = Array("default")
- override def listTables(namespace: Array[String]): Array[Identifier] = {
- namespace match {
- case Array(db) =>
- catalog
- .listTables(db)
- .map(ident =>
Identifier.of(ident.database.map(Array(_)).getOrElse(Array()), ident.table))
- .toArray
- case _ =>
- throw new NoSuchNamespaceException(namespace)
+ override def listTables(namespace: Array[String]): Array[Identifier] =
+ withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ namespace match {
+ case Array(db) =>
+ catalog
+ .listTables(db)
+ .map(ident =>
+ Identifier.of(ident.database.map(Array(_)).getOrElse(Array()),
ident.table))
+ .toArray
+ case _ =>
+ throw new NoSuchNamespaceException(namespace)
+ }
}
- }
- override def loadTable(ident: Identifier): Table = {
- HiveTable(sparkSession, catalog.getTableMetadata(ident.asTableIdentifier),
this)
- }
+ override def loadTable(ident: Identifier): Table =
+ withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ HiveTable(sparkSession,
catalog.getTableMetadata(ident.asTableIdentifier), this)
+ }
override def createTable(
ident: Identifier,
schema: StructType,
partitions: Array[Transform],
- properties: util.Map[String, String]): Table = {
- import
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.TransformHelper
- val (partitionColumns, maybeBucketSpec) =
partitions.toSeq.convertTransforms
- val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER,
conf.defaultDataSourceName)
- val tableProperties = properties.asScala
- val location = Option(properties.get(TableCatalog.PROP_LOCATION))
- val storage =
DataSource.buildStorageFormatFromOptions(toOptions(tableProperties.toMap))
- .copy(locationUri = location.map(CatalogUtils.stringToURI))
- val isExternal = properties.containsKey(TableCatalog.PROP_EXTERNAL)
- val tableType =
- if (isExternal || location.isDefined) {
- CatalogTableType.EXTERNAL
- } else {
- CatalogTableType.MANAGED
+ properties: util.Map[String, String]): Table =
+ withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ import
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.TransformHelper
+ val (partitionColumns, maybeBucketSpec) =
partitions.toSeq.convertTransforms
+ val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER,
conf.defaultDataSourceName)
+ val tableProperties = properties.asScala
+ val location = Option(properties.get(TableCatalog.PROP_LOCATION))
+ val storage =
DataSource.buildStorageFormatFromOptions(toOptions(tableProperties.toMap))
+ .copy(locationUri = location.map(CatalogUtils.stringToURI))
+ val isExternal = properties.containsKey(TableCatalog.PROP_EXTERNAL)
+ val tableType =
+ if (isExternal || location.isDefined) {
+ CatalogTableType.EXTERNAL
+ } else {
+ CatalogTableType.MANAGED
+ }
+
+ val tableDesc = CatalogTable(
+ identifier = ident.asTableIdentifier,
+ tableType = tableType,
+ storage = storage,
+ schema = schema,
+ provider = Some(provider),
+ partitionColumnNames = partitionColumns,
+ bucketSpec = maybeBucketSpec,
+ properties = tableProperties.toMap,
+ tracksPartitionsInCatalog = conf.manageFilesourcePartitions,
+ comment = Option(properties.get(TableCatalog.PROP_COMMENT)))
+
+ try {
+ catalog.createTable(tableDesc, ignoreIfExists = false)
+ } catch {
+ case _: TableAlreadyExistsException =>
+ throw new TableAlreadyExistsException(ident)
}
- val tableDesc = CatalogTable(
- identifier = ident.asTableIdentifier,
- tableType = tableType,
- storage = storage,
- schema = schema,
- provider = Some(provider),
- partitionColumnNames = partitionColumns,
- bucketSpec = maybeBucketSpec,
- properties = tableProperties.toMap,
- tracksPartitionsInCatalog = conf.manageFilesourcePartitions,
- comment = Option(properties.get(TableCatalog.PROP_COMMENT)))
-
- try {
- catalog.createTable(tableDesc, ignoreIfExists = false)
- } catch {
- case _: TableAlreadyExistsException =>
- throw new TableAlreadyExistsException(ident)
+ loadTable(ident)
}
- loadTable(ident)
- }
+ override def alterTable(ident: Identifier, changes: TableChange*): Table =
+ withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ val catalogTable =
+ try {
+ catalog.getTableMetadata(ident.asTableIdentifier)
+ } catch {
+ case _: NoSuchTableException =>
+ throw new NoSuchTableException(ident)
+ }
+
+ val properties =
CatalogV2Util.applyPropertiesChanges(catalogTable.properties, changes)
+ val schema = HiveConnectorUtils.applySchemaChanges(
+ catalogTable.schema,
+ changes)
+ val comment = properties.get(TableCatalog.PROP_COMMENT)
+ val owner = properties.getOrElse(TableCatalog.PROP_OWNER,
catalogTable.owner)
+ val location =
properties.get(TableCatalog.PROP_LOCATION).map(CatalogUtils.stringToURI)
+ val storage =
+ if (location.isDefined) {
+ catalogTable.storage.copy(locationUri = location)
+ } else {
+ catalogTable.storage
+ }
- override def alterTable(ident: Identifier, changes: TableChange*): Table = {
- val catalogTable =
try {
- catalog.getTableMetadata(ident.asTableIdentifier)
+ catalog.alterTable(
+ catalogTable.copy(
+ properties = properties,
+ schema = schema,
+ owner = owner,
+ comment = comment,
+ storage = storage))
} catch {
case _: NoSuchTableException =>
throw new NoSuchTableException(ident)
}
- val properties =
CatalogV2Util.applyPropertiesChanges(catalogTable.properties, changes)
- val schema = HiveConnectorUtils.applySchemaChanges(
- catalogTable.schema,
- changes)
- val comment = properties.get(TableCatalog.PROP_COMMENT)
- val owner = properties.getOrElse(TableCatalog.PROP_OWNER,
catalogTable.owner)
- val location =
properties.get(TableCatalog.PROP_LOCATION).map(CatalogUtils.stringToURI)
- val storage =
- if (location.isDefined) {
- catalogTable.storage.copy(locationUri = location)
- } else {
- catalogTable.storage
- }
-
- try {
- catalog.alterTable(
- catalogTable.copy(
- properties = properties,
- schema = schema,
- owner = owner,
- comment = comment,
- storage = storage))
- } catch {
- case _: NoSuchTableException =>
- throw new NoSuchTableException(ident)
+ loadTable(ident)
}
- loadTable(ident)
- }
-
- override def dropTable(ident: Identifier): Boolean = {
- try {
- if (loadTable(ident) != null) {
- catalog.dropTable(
- ident.asTableIdentifier,
- ignoreIfNotExists = true,
- purge = true /* skip HDFS trash */ )
- true
- } else {
- false
+ override def dropTable(ident: Identifier): Boolean =
+ withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ try {
+ if (loadTable(ident) != null) {
+ catalog.dropTable(
+ ident.asTableIdentifier,
+ ignoreIfNotExists = true,
+ purge = true /* skip HDFS trash */ )
+ true
+ } else {
+ false
+ }
+ } catch {
+ case _: NoSuchTableException =>
+ false
}
- } catch {
- case _: NoSuchTableException =>
- false
}
- }
- override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit =
{
- if (tableExists(newIdent)) {
- throw new TableAlreadyExistsException(newIdent)
- }
+ override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit =
+ withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ if (tableExists(newIdent)) {
+ throw new TableAlreadyExistsException(newIdent)
+ }
- // Load table to make sure the table exists
- loadTable(oldIdent)
- catalog.renameTable(oldIdent.asTableIdentifier, newIdent.asTableIdentifier)
- }
+ // Load table to make sure the table exists
+ loadTable(oldIdent)
+ catalog.renameTable(oldIdent.asTableIdentifier,
newIdent.asTableIdentifier)
+ }
private def toOptions(properties: Map[String, String]): Map[String, String]
= {
properties.filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX)).map {
@@ -273,70 +283,78 @@ class HiveTableCatalog(sparkSession: SparkSession)
}
}
- override def listNamespaces(): Array[Array[String]] = {
- catalog.listDatabases().map(Array(_)).toArray
- }
-
- override def listNamespaces(namespace: Array[String]): Array[Array[String]]
= {
- namespace match {
- case Array() =>
- listNamespaces()
- case Array(db) if catalog.databaseExists(db) =>
- Array()
- case _ =>
- throw new NoSuchNamespaceException(namespace)
+ override def listNamespaces(): Array[Array[String]] =
+ withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ catalog.listDatabases().map(Array(_)).toArray
}
- }
- override def loadNamespaceMetadata(namespace: Array[String]):
util.Map[String, String] = {
- namespace match {
- case Array(db) =>
- try {
- catalog.getDatabaseMetadata(db).toMetadata
- } catch {
- case _: NoSuchDatabaseException =>
- throw new NoSuchNamespaceException(namespace)
- }
+ override def listNamespaces(namespace: Array[String]): Array[Array[String]] =
+ withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ namespace match {
+ case Array() =>
+ listNamespaces()
+ case Array(db) if catalog.databaseExists(db) =>
+ Array()
+ case _ =>
+ throw new NoSuchNamespaceException(namespace)
+ }
+ }
- case _ =>
- throw new NoSuchNamespaceException(namespace)
+ override def loadNamespaceMetadata(namespace: Array[String]):
util.Map[String, String] =
+ withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ namespace match {
+ case Array(db) =>
+ try {
+ catalog.getDatabaseMetadata(db).toMetadata
+ } catch {
+ case _: NoSuchDatabaseException =>
+ throw new NoSuchNamespaceException(namespace)
+ }
+
+ case _ =>
+ throw new NoSuchNamespaceException(namespace)
+ }
}
- }
override def createNamespace(
namespace: Array[String],
- metadata: util.Map[String, String]): Unit = namespace match {
- case Array(db) if !catalog.databaseExists(db) =>
- catalog.createDatabase(
- toCatalogDatabase(db, metadata, defaultLocation =
Some(catalog.getDefaultDBPath(db))),
- ignoreIfExists = false)
-
- case Array(_) =>
- throw new NamespaceAlreadyExistsException(namespace)
-
- case _ =>
- throw new IllegalArgumentException(s"Invalid namespace name:
${namespace.quoted}")
- }
-
- override def alterNamespace(namespace: Array[String], changes:
NamespaceChange*): Unit = {
- namespace match {
- case Array(db) =>
- // validate that this catalog's reserved properties are not removed
- changes.foreach {
- case remove: RemoveProperty if
NAMESPACE_RESERVED_PROPERTIES.contains(remove.property) =>
- throw new UnsupportedOperationException(
- s"Cannot remove reserved property: ${remove.property}")
- case _ =>
- }
-
- val metadata = catalog.getDatabaseMetadata(db).toMetadata
- catalog.alterDatabase(
- toCatalogDatabase(db, CatalogV2Util.applyNamespaceChanges(metadata,
changes)))
+ metadata: util.Map[String, String]): Unit =
+ withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ namespace match {
+ case Array(db) if !catalog.databaseExists(db) =>
+ catalog.createDatabase(
+ toCatalogDatabase(db, metadata, defaultLocation =
Some(catalog.getDefaultDBPath(db))),
+ ignoreIfExists = false)
+
+ case Array(_) =>
+ throw new NamespaceAlreadyExistsException(namespace)
+
+ case _ =>
+ throw new IllegalArgumentException(s"Invalid namespace name:
${namespace.quoted}")
+ }
+ }
- case _ =>
- throw new NoSuchNamespaceException(namespace)
+ override def alterNamespace(namespace: Array[String], changes:
NamespaceChange*): Unit =
+ withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ namespace match {
+ case Array(db) =>
+ // validate that this catalog's reserved properties are not removed
+ changes.foreach {
+ case remove: RemoveProperty
+ if NAMESPACE_RESERVED_PROPERTIES.contains(remove.property) =>
+ throw new UnsupportedOperationException(
+ s"Cannot remove reserved property: ${remove.property}")
+ case _ =>
+ }
+
+ val metadata = catalog.getDatabaseMetadata(db).toMetadata
+ catalog.alterDatabase(
+ toCatalogDatabase(db,
CatalogV2Util.applyNamespaceChanges(metadata, changes)))
+
+ case _ =>
+ throw new NoSuchNamespaceException(namespace)
+ }
}
- }
/**
* List the metadata of partitions that belong to the specified table,
assuming it exists, that
@@ -356,21 +374,24 @@ class HiveTableCatalog(sparkSession: SparkSession)
override def dropNamespace(
namespace: Array[String],
- cascade: Boolean): Boolean = namespace match {
- case Array(db) if catalog.databaseExists(db) =>
- if (catalog.listTables(db).nonEmpty && !cascade) {
- throw new IllegalStateException(s"Namespace ${namespace.quoted} is not
empty")
+ cascade: Boolean): Boolean =
+ withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
+ namespace match {
+ case Array(db) if catalog.databaseExists(db) =>
+ if (catalog.listTables(db).nonEmpty && !cascade) {
+ throw new IllegalStateException(s"Namespace ${namespace.quoted} is
not empty")
+ }
+ catalog.dropDatabase(db, ignoreIfNotExists = false, cascade)
+ true
+
+ case Array(_) =>
+ // exists returned false
+ false
+
+ case _ =>
+ throw new NoSuchNamespaceException(namespace)
}
- catalog.dropDatabase(db, ignoreIfNotExists = false, cascade)
- true
-
- case Array(_) =>
- // exists returned false
- false
-
- case _ =>
- throw new NoSuchNamespaceException(namespace)
- }
+ }
}
private object HiveTableCatalog {
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
index c1575018e..5e0be5ce6 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
@@ -164,7 +164,7 @@ class HiveCatalogSuite extends KyuubiHiveTest {
catalog.createTable(testIdent, schema, Array.empty[Transform],
emptyProps)
val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
- assert(parsed == Seq("db", "test_table") || parsed == Seq("spark_catalog",
"db", "test_table"))
+ assert(parsed == Seq("db", "test_table"))
assert(table.schema == schema)
assert(filterV2TableProperties(table.properties) == Map())
@@ -181,7 +181,7 @@ class HiveCatalogSuite extends KyuubiHiveTest {
val table = catalog.createTable(testIdent, schema, Array.empty[Transform],
properties)
val parsed = CatalystSqlParser.parseMultipartIdentifier(table.name)
- assert(parsed == Seq("db", "test_table") || parsed == Seq("spark_catalog",
"db", "test_table"))
+ assert(parsed == Seq("db", "test_table"))
assert(table.schema == schema)
assert(filterV2TableProperties(table.properties).asJava == properties)