This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c79ebee1ff [HUDI-6924] Fix hoodie table config not wok in table 
properties (#9836)
7c79ebee1ff is described below

commit 7c79ebee1ff1c9a0f5117252cb12fa2f03ac4b24
Author: Wechar Yu <[email protected]>
AuthorDate: Tue Oct 17 11:13:55 2023 +0800

    [HUDI-6924] Fix hoodie table config not wok in table properties (#9836)
---
 .../sql/catalyst/catalog/HoodieCatalogTable.scala  |  6 ++--
 .../apache/spark/sql/hudi/HoodieOptionConfig.scala | 37 +++++++++++-----------
 .../apache/spark/sql/hudi/TestCreateTable.scala    | 25 +++++++++++++++
 3 files changed, 47 insertions(+), 21 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
index ee041e94b87..772dd27e279 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
@@ -234,10 +234,10 @@ class HoodieCatalogTable(val spark: SparkSession, var 
table: CatalogTable) exten
   private def parseSchemaAndConfigs(): (StructType, Map[String, String]) = {
     val globalProps = DFSPropertiesConfiguration.getGlobalProps.asScala.toMap
     val globalTableConfigs = 
mappingSparkDatasourceConfigsToTableConfigs(globalProps)
-    val globalSqlOptions = mapTableConfigsToSqlOptions(globalTableConfigs)
+    val globalSqlOptions = mapHoodieConfigsToSqlOptions(globalTableConfigs)
 
     val sqlOptions = withDefaultSqlOptions(globalSqlOptions ++
-      mapDataSourceWriteOptionsToSqlOptions(catalogProperties) ++ 
catalogProperties)
+      mapHoodieConfigsToSqlOptions(catalogProperties))
 
     // get final schema and parameters
     val (finalSchema, tableConfigs) = (table.tableType, hoodieTableExists) 
match {
@@ -265,7 +265,7 @@ class HoodieCatalogTable(val spark: SparkSession, var 
table: CatalogTable) exten
           s". The associated location('$tableLocation') already exists.")
     }
     HoodieOptionConfig.validateTable(spark, finalSchema,
-      mapTableConfigsToSqlOptions(tableConfigs))
+      mapHoodieConfigsToSqlOptions(tableConfigs))
 
     val resolver = spark.sessionState.conf.resolver
     val dataSchema = finalSchema.filterNot { f =>
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
index 66c81ae331e..f98608176be 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi
 
 import org.apache.hudi.DataSourceWriteOptions
 import org.apache.hudi.avro.HoodieAvroUtils.getRootLevelFieldName
-import org.apache.hudi.common.model.HoodieRecordMerger
+import org.apache.hudi.common.model.{HoodieRecordMerger, HoodieTableType}
 import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.common.util.ValidationUtils
 import org.apache.spark.sql.SparkSession
@@ -116,12 +116,12 @@ object HoodieOptionConfig {
   /**
    * Mapping of the short sql value to the hoodie's config value
    */
-  private val sqlOptionValueToWriteConfigValue: Map[String, String] = Map (
-    SQL_VALUE_TABLE_TYPE_COW -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
-    SQL_VALUE_TABLE_TYPE_MOR -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL
+  private val sqlOptionValueToHoodieConfigValue: Map[String, String] = Map (
+    SQL_VALUE_TABLE_TYPE_COW -> HoodieTableType.COPY_ON_WRITE.name,
+    SQL_VALUE_TABLE_TYPE_MOR -> HoodieTableType.MERGE_ON_READ.name
   )
 
-  private lazy val writeConfigValueToSqlOptionValue = 
sqlOptionValueToWriteConfigValue.map(f => f._2 -> f._1)
+  private lazy val hoodieConfigValueToSqlOptionValue = 
sqlOptionValueToHoodieConfigValue.map(f => f._2 -> f._1)
 
   def withDefaultSqlOptions(options: Map[String, String]): Map[String, String] 
= defaultSqlOptions ++ options
 
@@ -130,14 +130,22 @@ object HoodieOptionConfig {
    */
   def mapSqlOptionsToDataSourceWriteConfigs(options: Map[String, String]): 
Map[String, String] = {
     options.map (kv =>
-      sqlOptionKeyToWriteConfigKey.getOrElse(kv._1, kv._1) -> 
sqlOptionValueToWriteConfigValue.getOrElse(kv._2, kv._2))
+      sqlOptionKeyToWriteConfigKey.getOrElse(kv._1, kv._1) -> 
sqlOptionValueToHoodieConfigValue.getOrElse(kv._2, kv._2))
   }
 
   /**
-   * Mapping the data source write configs to SQL options.
+   * Mapping the hoodie configs (including data source write configs and 
hoodie table configs) to SQL options.
    */
-  def mapDataSourceWriteOptionsToSqlOptions(options: Map[String, String]): 
Map[String, String] = {
-    options.map(kv => writeConfigKeyToSqlOptionKey.getOrElse(kv._1, kv._1) -> 
writeConfigValueToSqlOptionValue.getOrElse(kv._2, kv._2))
+  def mapHoodieConfigsToSqlOptions(options: Map[String, String]): Map[String, 
String] = {
+    options.map { case (k, v) =>
+      if (writeConfigKeyToSqlOptionKey.contains(k)) {
+        writeConfigKeyToSqlOptionKey(k) -> 
hoodieConfigValueToSqlOptionValue.getOrElse(v, v)
+      } else if (tableConfigKeyToSqlOptionKey.contains(k)) {
+        tableConfigKeyToSqlOptionKey(k) -> 
hoodieConfigValueToSqlOptionValue.getOrElse(v, v)
+      } else {
+        k -> v
+      }
+    }
   }
 
   /**
@@ -146,20 +154,13 @@ object HoodieOptionConfig {
   def mapSqlOptionsToTableConfigs(options: Map[String, String]): Map[String, 
String] = {
     options.map { case (k, v) =>
       if (sqlOptionKeyToTableConfigKey.contains(k)) {
-        sqlOptionKeyToTableConfigKey(k) -> 
sqlOptionValueToWriteConfigValue.getOrElse(v, v)
+        sqlOptionKeyToTableConfigKey(k) -> 
sqlOptionValueToHoodieConfigValue.getOrElse(v, v)
       } else {
         k -> v
       }
     }
   }
 
-  /**
-   * Map table configs to SQL options.
-   */
-  def mapTableConfigsToSqlOptions(options: Map[String, String]): Map[String, 
String] = {
-    options.map(kv => tableConfigKeyToSqlOptionKey.getOrElse(kv._1, kv._1) -> 
writeConfigValueToSqlOptionValue.getOrElse(kv._2, kv._2))
-  }
-
   val defaultSqlOptions: Map[String, String] = {
     HoodieOptionConfig.getClass.getDeclaredFields
       .filter(f => f.getType == classOf[HoodieSQLOption[_]])
@@ -199,7 +200,7 @@ object HoodieOptionConfig {
 
   // extract primaryKey, preCombineField, type options
   def extractSqlOptions(options: Map[String, String]): Map[String, String] = {
-    val sqlOptions = mapTableConfigsToSqlOptions(options)
+    val sqlOptions = mapHoodieConfigsToSqlOptions(options)
     val targetOptions = sqlOptionKeyToWriteConfigKey.keySet -- 
Set(SQL_PAYLOAD_CLASS.sqlKeyName) -- Set(SQL_RECORD_MERGER_STRATEGY.sqlKeyName) 
-- Set(SQL_PAYLOAD_TYPE.sqlKeyName)
     sqlOptions.filterKeys(targetOptions.contains)
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
index f0fd5159450..e41ec8846b8 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
@@ -1461,4 +1461,29 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
       
assertResult(table.storage.outputFormat.get)("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat")
     }
   }
+
+  test("Test Create Hoodie Table with table configs") {
+    Seq("COPY_ON_WRITE", "MERGE_ON_READ").foreach { tableType =>
+      withTable(generateTableName) { tableName =>
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | tblproperties (
+             |  hoodie.table.recordkey.fields ='id',
+             |  hoodie.table.type = '$tableType',
+             |  hoodie.table.precombine.field = 'ts'
+             | )
+       """.stripMargin)
+        val hoodieCatalogTable = HoodieCatalogTable(spark, 
TableIdentifier(tableName))
+        assertResult(Array("id"))(hoodieCatalogTable.primaryKeys)
+        assertResult(tableType)(hoodieCatalogTable.tableTypeName)
+        assertResult("ts")(hoodieCatalogTable.preCombineKey.get)
+      }
+    }
+  }
 }

Reply via email to