This is an automated email from the ASF dual-hosted git repository.
yikaifei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 5915d682b [KYUUBI #5022] [KSHC] CreateTable should use the correct
provider
5915d682b is described below
commit 5915d682b5ea07b96544cabced19264e736a450c
Author: yikaifei <[email protected]>
AuthorDate: Fri Jul 7 12:04:55 2023 +0800
[KYUUBI #5022] [KSHC] CreateTable should use the correct provider
### _Why are the changes needed?_
This PR aims to fix a bug, In KSHC, `catalog.createTable` should use the
correct provider.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
Closes #5022 from Yikf/KSHC-createTable.
Closes #5022
cd8cb1cf2 [yikaifei] CreateTable should use the correct provider
Authored-by: yikaifei <[email protected]>
Signed-off-by: yikaifei <[email protected]>
---
.../spark/connector/hive/HiveTableCatalog.scala | 82 ++++++++++++++++++++--
.../spark/connector/hive/HiveCatalogSuite.scala | 1 +
.../spark/connector/hive/KyuubiHiveTest.scala | 3 +-
.../connector/hive/command/CreateTableSuite.scala | 78 ++++++++++++++++++++
4 files changed, 156 insertions(+), 8 deletions(-)
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
index 2d816c8c4..41976b264 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
@@ -36,15 +36,16 @@ import org.apache.spark.sql.catalyst.util.quoteIfNeeded
import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange,
SupportsNamespaces, Table, TableCatalog, TableChange}
import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty
import org.apache.spark.sql.connector.expressions.Transform
-import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.hive.HiveUDFExpressionBuilder
import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper._
+import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.internal.StaticSQLConf.{CATALOG_IMPLEMENTATION,
GLOBAL_TEMP_DATABASE}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils.withSQLConf
-import
org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.{toCatalogDatabase,
CatalogDatabaseHelper, IdentifierHelper, NamespaceHelper}
+import
org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.{getStorageFormatAndProvider,
toCatalogDatabase, CatalogDatabaseHelper, IdentifierHelper, NamespaceHelper}
import
org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorDelegationTokenProvider.metastoreTokenSignature
/**
@@ -173,11 +174,14 @@ class HiveTableCatalog(sparkSession: SparkSession)
withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
import
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.TransformHelper
val (partitionColumns, maybeBucketSpec) =
partitions.toSeq.convertTransforms
- val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER,
conf.defaultDataSourceName)
- val tableProperties = properties.asScala
val location = Option(properties.get(TableCatalog.PROP_LOCATION))
- val storage =
DataSource.buildStorageFormatFromOptions(toOptions(tableProperties.toMap))
- .copy(locationUri = location.map(CatalogUtils.stringToURI))
+ val maybeProvider = Option(properties.get(TableCatalog.PROP_PROVIDER))
+ val (storage, provider) =
+ getStorageFormatAndProvider(
+ maybeProvider,
+ location,
+ properties.asScala.toMap)
+ val tableProperties = properties.asScala
val isExternal = properties.containsKey(TableCatalog.PROP_EXTERNAL)
val tableType =
if (isExternal || location.isDefined) {
@@ -394,7 +398,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
}
}
-private object HiveTableCatalog {
+private object HiveTableCatalog extends Logging {
private def toCatalogDatabase(
db: String,
metadata: util.Map[String, String],
@@ -410,6 +414,70 @@ private object HiveTableCatalog {
Seq(SupportsNamespaces.PROP_COMMENT, SupportsNamespaces.PROP_LOCATION))
}
+ private def getStorageFormatAndProvider(
+ provider: Option[String],
+ location: Option[String],
+ options: Map[String, String]): (CatalogStorageFormat, String) = {
+ val nonHiveStorageFormat = CatalogStorageFormat.empty.copy(
+ locationUri = location.map(CatalogUtils.stringToURI),
+ properties = options)
+
+ val conf = SQLConf.get
+ val defaultHiveStorage = HiveSerDe.getDefaultStorage(conf).copy(
+ locationUri = location.map(CatalogUtils.stringToURI),
+ properties = options)
+
+ if (provider.isDefined) {
+ (nonHiveStorageFormat, provider.get)
+ } else if (serdeIsDefined(options)) {
+ val maybeSerde = options.get("hive.serde")
+ val maybeStoredAs = options.get("hive.stored-as")
+ val maybeInputFormat = options.get("hive.input-format")
+ val maybeOutputFormat = options.get("hive.output-format")
+ val storageFormat = if (maybeStoredAs.isDefined) {
+ // If `STORED AS fileFormat` is used, infer inputFormat, outputFormat
and serde from it.
+ HiveSerDe.sourceToSerDe(maybeStoredAs.get) match {
+ case Some(hiveSerde) =>
+ defaultHiveStorage.copy(
+ inputFormat =
hiveSerde.inputFormat.orElse(defaultHiveStorage.inputFormat),
+ outputFormat =
hiveSerde.outputFormat.orElse(defaultHiveStorage.outputFormat),
+ // User specified serde takes precedence over the one inferred
from file format.
+ serde =
maybeSerde.orElse(hiveSerde.serde).orElse(defaultHiveStorage.serde),
+ properties = options ++ defaultHiveStorage.properties)
+ case _ => throw KyuubiHiveConnectorException(s"Unsupported serde
${maybeSerde.get}.")
+ }
+ } else {
+ defaultHiveStorage.copy(
+ inputFormat =
+ maybeInputFormat.orElse(defaultHiveStorage.inputFormat),
+ outputFormat =
+ maybeOutputFormat.orElse(defaultHiveStorage.outputFormat),
+ serde = maybeSerde.orElse(defaultHiveStorage.serde),
+ properties = options ++ defaultHiveStorage.properties)
+ }
+ (storageFormat, DDLUtils.HIVE_PROVIDER)
+ } else {
+ val createHiveTableByDefault =
conf.getConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT)
+ if (!createHiveTableByDefault) {
+ (nonHiveStorageFormat, conf.defaultDataSourceName)
+ } else {
+ logWarning("A Hive serde table will be created as there is no table
provider " +
+ s"specified. You can set
${SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT.key} to false " +
+ "so that native data source table will be created instead.")
+ (defaultHiveStorage, DDLUtils.HIVE_PROVIDER)
+ }
+ }
+ }
+
+ private def serdeIsDefined(options: Map[String, String]): Boolean = {
+ val maybeStoredAs = options.get("hive.stored-as")
+ val maybeInputFormat = options.get("hive.input-format")
+ val maybeOutputFormat = options.get("hive.output-format")
+ val maybeSerde = options.get("hive.serde")
+ maybeStoredAs.isDefined || maybeInputFormat.isDefined ||
+ maybeOutputFormat.isDefined || maybeSerde.isDefined
+ }
+
implicit class NamespaceHelper(namespace: Array[String]) {
def quoted: String = namespace.map(quoteIfNeeded).mkString(".")
}
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
index 4719544de..f43dafd11 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
@@ -220,6 +220,7 @@ class HiveCatalogSuite extends KyuubiHiveTest {
test("createTable: location") {
val properties = new util.HashMap[String, String]()
+ properties.put(TableCatalog.PROP_PROVIDER, "parquet")
assert(!catalog.tableExists(testIdent))
// default location
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala
index d0b17dc05..400afdb3e 100644
---
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala
@@ -35,7 +35,8 @@ abstract class KyuubiHiveTest extends QueryTest with Logging {
TableCatalog.PROP_PROVIDER,
TableCatalog.PROP_OWNER,
TableCatalog.PROP_EXTERNAL,
- TableCatalog.PROP_IS_MANAGED_LOCATION)
+ TableCatalog.PROP_IS_MANAGED_LOCATION,
+ "transient_lastDdlTime")
protected val NAMESPACE_RESERVED_PROPERTIES =
Seq(
diff --git
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/CreateTableSuite.scala
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/CreateTableSuite.scala
new file mode 100644
index 000000000..d26ec4209
--- /dev/null
+++
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/CreateTableSuite.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.spark.connector.hive.command
+
+import org.apache.spark.sql.connector.catalog.Identifier
+
+import org.apache.kyuubi.spark.connector.hive.{HiveTable, HiveTableCatalog}
+import
org.apache.kyuubi.spark.connector.hive.command.DDLCommandTestUtils.V2_COMMAND_VERSION
+
+class CreateTableSuite extends DDLCommandTestUtils {
+
+ override protected def command: String = "CREATE TABLE"
+
+ override protected def catalogVersion: String = "Hive V2"
+
+ override protected def commandVersion: String = V2_COMMAND_VERSION
+
+ test("Create datasource table") {
+ val hiveCatalog = spark.sessionState.catalogManager
+ .catalog(catalogName).asInstanceOf[HiveTableCatalog]
+ val table = "hive.default.employee"
+ Seq("parquet", "orc").foreach { provider =>
+ withTable(table) {
+ sql(
+ s"""
+ | CREATE TABLE IF NOT EXISTS
+ | $table (id String, year String, month string)
+ | USING $provider
+ | PARTITIONED BY (year, month)
+ |""".stripMargin).collect()
+ val employee = Identifier.of(Array("default"), "employee")
+ val loadTable = hiveCatalog.loadTable(employee)
+ assert(loadTable.isInstanceOf[HiveTable])
+ val catalogTable = loadTable.asInstanceOf[HiveTable].catalogTable
+ assert(catalogTable.provider.isDefined)
+ assert(catalogTable.provider.get.equalsIgnoreCase(provider))
+ }
+ }
+ }
+
+ test("Create hive table") {
+ val hiveCatalog = spark.sessionState.catalogManager
+ .catalog(catalogName).asInstanceOf[HiveTableCatalog]
+ val table = "hive.default.employee"
+ Seq("parquet", "orc").foreach { provider =>
+ withTable(table) {
+ sql(
+ s"""
+ | CREATE TABLE IF NOT EXISTS
+ | $table (id String, year String, month string)
+ | STORED AS $provider
+ | PARTITIONED BY (year, month)
+ |""".stripMargin).collect()
+ val employee = Identifier.of(Array("default"), "employee")
+ val loadTable = hiveCatalog.loadTable(employee)
+ assert(loadTable.isInstanceOf[HiveTable])
+ val catalogTable = loadTable.asInstanceOf[HiveTable].catalogTable
+ assert(catalogTable.provider.isDefined)
+ assert(catalogTable.provider.get.equalsIgnoreCase("hive"))
+
assert(catalogTable.storage.serde.getOrElse("Unknown").contains(provider))
+ }
+ }
+ }
+}