This is an automated email from the ASF dual-hosted git repository.

yikaifei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5915d682b [KYUUBI #5022] [KSHC] CreateTable should use the correct 
provider
5915d682b is described below

commit 5915d682b5ea07b96544cabced19264e736a450c
Author: yikaifei <[email protected]>
AuthorDate: Fri Jul 7 12:04:55 2023 +0800

    [KYUUBI #5022] [KSHC] CreateTable should use the correct provider
    
    ### _Why are the changes needed?_
    
    This PR aims to fix a bug, In KSHC, `catalog.createTable` should use the 
correct provider.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    Closes #5022 from Yikf/KSHC-createTable.
    
    Closes #5022
    
    cd8cb1cf2 [yikaifei] CreateTable should use the correct provider
    
    Authored-by: yikaifei <[email protected]>
    Signed-off-by: yikaifei <[email protected]>
---
 .../spark/connector/hive/HiveTableCatalog.scala    | 82 ++++++++++++++++++++--
 .../spark/connector/hive/HiveCatalogSuite.scala    |  1 +
 .../spark/connector/hive/KyuubiHiveTest.scala      |  3 +-
 .../connector/hive/command/CreateTableSuite.scala  | 78 ++++++++++++++++++++
 4 files changed, 156 insertions(+), 8 deletions(-)

diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
index 2d816c8c4..41976b264 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala
@@ -36,15 +36,16 @@ import org.apache.spark.sql.catalyst.util.quoteIfNeeded
 import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange, 
SupportsNamespaces, Table, TableCatalog, TableChange}
 import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty
 import org.apache.spark.sql.connector.expressions.Transform
-import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.hive.HiveUDFExpressionBuilder
 import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper._
+import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
 import org.apache.spark.sql.internal.StaticSQLConf.{CATALOG_IMPLEMENTATION, 
GLOBAL_TEMP_DATABASE}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 import org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils.withSQLConf
-import 
org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.{toCatalogDatabase, 
CatalogDatabaseHelper, IdentifierHelper, NamespaceHelper}
+import 
org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.{getStorageFormatAndProvider,
 toCatalogDatabase, CatalogDatabaseHelper, IdentifierHelper, NamespaceHelper}
 import 
org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorDelegationTokenProvider.metastoreTokenSignature
 
 /**
@@ -173,11 +174,14 @@ class HiveTableCatalog(sparkSession: SparkSession)
     withSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") {
       import 
org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.TransformHelper
       val (partitionColumns, maybeBucketSpec) = 
partitions.toSeq.convertTransforms
-      val provider = properties.getOrDefault(TableCatalog.PROP_PROVIDER, 
conf.defaultDataSourceName)
-      val tableProperties = properties.asScala
       val location = Option(properties.get(TableCatalog.PROP_LOCATION))
-      val storage = 
DataSource.buildStorageFormatFromOptions(toOptions(tableProperties.toMap))
-        .copy(locationUri = location.map(CatalogUtils.stringToURI))
+      val maybeProvider = Option(properties.get(TableCatalog.PROP_PROVIDER))
+      val (storage, provider) =
+        getStorageFormatAndProvider(
+          maybeProvider,
+          location,
+          properties.asScala.toMap)
+      val tableProperties = properties.asScala
       val isExternal = properties.containsKey(TableCatalog.PROP_EXTERNAL)
       val tableType =
         if (isExternal || location.isDefined) {
@@ -394,7 +398,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
     }
 }
 
-private object HiveTableCatalog {
+private object HiveTableCatalog extends Logging {
   private def toCatalogDatabase(
       db: String,
       metadata: util.Map[String, String],
@@ -410,6 +414,70 @@ private object HiveTableCatalog {
         Seq(SupportsNamespaces.PROP_COMMENT, SupportsNamespaces.PROP_LOCATION))
   }
 
+  private def getStorageFormatAndProvider(
+      provider: Option[String],
+      location: Option[String],
+      options: Map[String, String]): (CatalogStorageFormat, String) = {
+    val nonHiveStorageFormat = CatalogStorageFormat.empty.copy(
+      locationUri = location.map(CatalogUtils.stringToURI),
+      properties = options)
+
+    val conf = SQLConf.get
+    val defaultHiveStorage = HiveSerDe.getDefaultStorage(conf).copy(
+      locationUri = location.map(CatalogUtils.stringToURI),
+      properties = options)
+
+    if (provider.isDefined) {
+      (nonHiveStorageFormat, provider.get)
+    } else if (serdeIsDefined(options)) {
+      val maybeSerde = options.get("hive.serde")
+      val maybeStoredAs = options.get("hive.stored-as")
+      val maybeInputFormat = options.get("hive.input-format")
+      val maybeOutputFormat = options.get("hive.output-format")
+      val storageFormat = if (maybeStoredAs.isDefined) {
+        // If `STORED AS fileFormat` is used, infer inputFormat, outputFormat 
and serde from it.
+        HiveSerDe.sourceToSerDe(maybeStoredAs.get) match {
+          case Some(hiveSerde) =>
+            defaultHiveStorage.copy(
+              inputFormat = 
hiveSerde.inputFormat.orElse(defaultHiveStorage.inputFormat),
+              outputFormat = 
hiveSerde.outputFormat.orElse(defaultHiveStorage.outputFormat),
+              // User specified serde takes precedence over the one inferred 
from file format.
+              serde = 
maybeSerde.orElse(hiveSerde.serde).orElse(defaultHiveStorage.serde),
+              properties = options ++ defaultHiveStorage.properties)
+          case _ => throw KyuubiHiveConnectorException(s"Unsupported serde 
${maybeSerde.get}.")
+        }
+      } else {
+        defaultHiveStorage.copy(
+          inputFormat =
+            maybeInputFormat.orElse(defaultHiveStorage.inputFormat),
+          outputFormat =
+            maybeOutputFormat.orElse(defaultHiveStorage.outputFormat),
+          serde = maybeSerde.orElse(defaultHiveStorage.serde),
+          properties = options ++ defaultHiveStorage.properties)
+      }
+      (storageFormat, DDLUtils.HIVE_PROVIDER)
+    } else {
+      val createHiveTableByDefault = 
conf.getConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT)
+      if (!createHiveTableByDefault) {
+        (nonHiveStorageFormat, conf.defaultDataSourceName)
+      } else {
+        logWarning("A Hive serde table will be created as there is no table 
provider " +
+          s"specified. You can set 
${SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT.key} to false " +
+          "so that native data source table will be created instead.")
+        (defaultHiveStorage, DDLUtils.HIVE_PROVIDER)
+      }
+    }
+  }
+
+  private def serdeIsDefined(options: Map[String, String]): Boolean = {
+    val maybeStoredAs = options.get("hive.stored-as")
+    val maybeInputFormat = options.get("hive.input-format")
+    val maybeOutputFormat = options.get("hive.output-format")
+    val maybeSerde = options.get("hive.serde")
+    maybeStoredAs.isDefined || maybeInputFormat.isDefined ||
+    maybeOutputFormat.isDefined || maybeSerde.isDefined
+  }
+
   implicit class NamespaceHelper(namespace: Array[String]) {
     def quoted: String = namespace.map(quoteIfNeeded).mkString(".")
   }
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
index 4719544de..f43dafd11 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala
@@ -220,6 +220,7 @@ class HiveCatalogSuite extends KyuubiHiveTest {
 
   test("createTable: location") {
     val properties = new util.HashMap[String, String]()
+    properties.put(TableCatalog.PROP_PROVIDER, "parquet")
     assert(!catalog.tableExists(testIdent))
 
     // default location
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala
index d0b17dc05..400afdb3e 100644
--- 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala
@@ -35,7 +35,8 @@ abstract class KyuubiHiveTest extends QueryTest with Logging {
       TableCatalog.PROP_PROVIDER,
       TableCatalog.PROP_OWNER,
       TableCatalog.PROP_EXTERNAL,
-      TableCatalog.PROP_IS_MANAGED_LOCATION)
+      TableCatalog.PROP_IS_MANAGED_LOCATION,
+      "transient_lastDdlTime")
 
   protected val NAMESPACE_RESERVED_PROPERTIES =
     Seq(
diff --git 
a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/CreateTableSuite.scala
 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/CreateTableSuite.scala
new file mode 100644
index 000000000..d26ec4209
--- /dev/null
+++ 
b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/command/CreateTableSuite.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.spark.connector.hive.command
+
+import org.apache.spark.sql.connector.catalog.Identifier
+
+import org.apache.kyuubi.spark.connector.hive.{HiveTable, HiveTableCatalog}
+import 
org.apache.kyuubi.spark.connector.hive.command.DDLCommandTestUtils.V2_COMMAND_VERSION
+
+class CreateTableSuite extends DDLCommandTestUtils {
+
+  override protected def command: String = "CREATE TABLE"
+
+  override protected def catalogVersion: String = "Hive V2"
+
+  override protected def commandVersion: String = V2_COMMAND_VERSION
+
+  test("Create datasource table") {
+    val hiveCatalog = spark.sessionState.catalogManager
+      .catalog(catalogName).asInstanceOf[HiveTableCatalog]
+    val table = "hive.default.employee"
+    Seq("parquet", "orc").foreach { provider =>
+      withTable(table) {
+        sql(
+          s"""
+             | CREATE TABLE IF NOT EXISTS
+             | $table (id String, year String, month string)
+             | USING $provider
+             | PARTITIONED BY (year, month)
+             |""".stripMargin).collect()
+        val employee = Identifier.of(Array("default"), "employee")
+        val loadTable = hiveCatalog.loadTable(employee)
+        assert(loadTable.isInstanceOf[HiveTable])
+        val catalogTable = loadTable.asInstanceOf[HiveTable].catalogTable
+        assert(catalogTable.provider.isDefined)
+        assert(catalogTable.provider.get.equalsIgnoreCase(provider))
+      }
+    }
+  }
+
+  test("Create hive table") {
+    val hiveCatalog = spark.sessionState.catalogManager
+      .catalog(catalogName).asInstanceOf[HiveTableCatalog]
+    val table = "hive.default.employee"
+    Seq("parquet", "orc").foreach { provider =>
+      withTable(table) {
+        sql(
+          s"""
+             | CREATE TABLE IF NOT EXISTS
+             | $table (id String, year String, month string)
+             | STORED AS $provider
+             | PARTITIONED BY (year, month)
+             |""".stripMargin).collect()
+        val employee = Identifier.of(Array("default"), "employee")
+        val loadTable = hiveCatalog.loadTable(employee)
+        assert(loadTable.isInstanceOf[HiveTable])
+        val catalogTable = loadTable.asInstanceOf[HiveTable].catalogTable
+        assert(catalogTable.provider.isDefined)
+        assert(catalogTable.provider.get.equalsIgnoreCase("hive"))
+        
assert(catalogTable.storage.serde.getOrElse("Unknown").contains(provider))
+      }
+    }
+  }
+}

Reply via email to