This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7c79ebee1ff [HUDI-6924] Fix hoodie table config not wok in table
properties (#9836)
7c79ebee1ff is described below
commit 7c79ebee1ff1c9a0f5117252cb12fa2f03ac4b24
Author: Wechar Yu <[email protected]>
AuthorDate: Tue Oct 17 11:13:55 2023 +0800
[HUDI-6924] Fix hoodie table config not wok in table properties (#9836)
---
.../sql/catalyst/catalog/HoodieCatalogTable.scala | 6 ++--
.../apache/spark/sql/hudi/HoodieOptionConfig.scala | 37 +++++++++++-----------
.../apache/spark/sql/hudi/TestCreateTable.scala | 25 +++++++++++++++
3 files changed, 47 insertions(+), 21 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
index ee041e94b87..772dd27e279 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
@@ -234,10 +234,10 @@ class HoodieCatalogTable(val spark: SparkSession, var
table: CatalogTable) exten
private def parseSchemaAndConfigs(): (StructType, Map[String, String]) = {
val globalProps = DFSPropertiesConfiguration.getGlobalProps.asScala.toMap
val globalTableConfigs =
mappingSparkDatasourceConfigsToTableConfigs(globalProps)
- val globalSqlOptions = mapTableConfigsToSqlOptions(globalTableConfigs)
+ val globalSqlOptions = mapHoodieConfigsToSqlOptions(globalTableConfigs)
val sqlOptions = withDefaultSqlOptions(globalSqlOptions ++
- mapDataSourceWriteOptionsToSqlOptions(catalogProperties) ++
catalogProperties)
+ mapHoodieConfigsToSqlOptions(catalogProperties))
// get final schema and parameters
val (finalSchema, tableConfigs) = (table.tableType, hoodieTableExists)
match {
@@ -265,7 +265,7 @@ class HoodieCatalogTable(val spark: SparkSession, var
table: CatalogTable) exten
s". The associated location('$tableLocation') already exists.")
}
HoodieOptionConfig.validateTable(spark, finalSchema,
- mapTableConfigsToSqlOptions(tableConfigs))
+ mapHoodieConfigsToSqlOptions(tableConfigs))
val resolver = spark.sessionState.conf.resolver
val dataSchema = finalSchema.filterNot { f =>
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
index 66c81ae331e..f98608176be 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.avro.HoodieAvroUtils.getRootLevelFieldName
-import org.apache.hudi.common.model.HoodieRecordMerger
+import org.apache.hudi.common.model.{HoodieRecordMerger, HoodieTableType}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.util.ValidationUtils
import org.apache.spark.sql.SparkSession
@@ -116,12 +116,12 @@ object HoodieOptionConfig {
/**
* Mapping of the short sql value to the hoodie's config value
*/
- private val sqlOptionValueToWriteConfigValue: Map[String, String] = Map (
- SQL_VALUE_TABLE_TYPE_COW -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
- SQL_VALUE_TABLE_TYPE_MOR -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL
+ private val sqlOptionValueToHoodieConfigValue: Map[String, String] = Map (
+ SQL_VALUE_TABLE_TYPE_COW -> HoodieTableType.COPY_ON_WRITE.name,
+ SQL_VALUE_TABLE_TYPE_MOR -> HoodieTableType.MERGE_ON_READ.name
)
- private lazy val writeConfigValueToSqlOptionValue =
sqlOptionValueToWriteConfigValue.map(f => f._2 -> f._1)
+ private lazy val hoodieConfigValueToSqlOptionValue =
sqlOptionValueToHoodieConfigValue.map(f => f._2 -> f._1)
def withDefaultSqlOptions(options: Map[String, String]): Map[String, String]
= defaultSqlOptions ++ options
@@ -130,14 +130,22 @@ object HoodieOptionConfig {
*/
def mapSqlOptionsToDataSourceWriteConfigs(options: Map[String, String]):
Map[String, String] = {
options.map (kv =>
- sqlOptionKeyToWriteConfigKey.getOrElse(kv._1, kv._1) ->
sqlOptionValueToWriteConfigValue.getOrElse(kv._2, kv._2))
+ sqlOptionKeyToWriteConfigKey.getOrElse(kv._1, kv._1) ->
sqlOptionValueToHoodieConfigValue.getOrElse(kv._2, kv._2))
}
/**
- * Mapping the data source write configs to SQL options.
+ * Mapping the hoodie configs (including data source write configs and
hoodie table configs) to SQL options.
*/
- def mapDataSourceWriteOptionsToSqlOptions(options: Map[String, String]):
Map[String, String] = {
- options.map(kv => writeConfigKeyToSqlOptionKey.getOrElse(kv._1, kv._1) ->
writeConfigValueToSqlOptionValue.getOrElse(kv._2, kv._2))
+ def mapHoodieConfigsToSqlOptions(options: Map[String, String]): Map[String,
String] = {
+ options.map { case (k, v) =>
+ if (writeConfigKeyToSqlOptionKey.contains(k)) {
+ writeConfigKeyToSqlOptionKey(k) ->
hoodieConfigValueToSqlOptionValue.getOrElse(v, v)
+ } else if (tableConfigKeyToSqlOptionKey.contains(k)) {
+ tableConfigKeyToSqlOptionKey(k) ->
hoodieConfigValueToSqlOptionValue.getOrElse(v, v)
+ } else {
+ k -> v
+ }
+ }
}
/**
@@ -146,20 +154,13 @@ object HoodieOptionConfig {
def mapSqlOptionsToTableConfigs(options: Map[String, String]): Map[String,
String] = {
options.map { case (k, v) =>
if (sqlOptionKeyToTableConfigKey.contains(k)) {
- sqlOptionKeyToTableConfigKey(k) ->
sqlOptionValueToWriteConfigValue.getOrElse(v, v)
+ sqlOptionKeyToTableConfigKey(k) ->
sqlOptionValueToHoodieConfigValue.getOrElse(v, v)
} else {
k -> v
}
}
}
- /**
- * Map table configs to SQL options.
- */
- def mapTableConfigsToSqlOptions(options: Map[String, String]): Map[String,
String] = {
- options.map(kv => tableConfigKeyToSqlOptionKey.getOrElse(kv._1, kv._1) ->
writeConfigValueToSqlOptionValue.getOrElse(kv._2, kv._2))
- }
-
val defaultSqlOptions: Map[String, String] = {
HoodieOptionConfig.getClass.getDeclaredFields
.filter(f => f.getType == classOf[HoodieSQLOption[_]])
@@ -199,7 +200,7 @@ object HoodieOptionConfig {
// extract primaryKey, preCombineField, type options
def extractSqlOptions(options: Map[String, String]): Map[String, String] = {
- val sqlOptions = mapTableConfigsToSqlOptions(options)
+ val sqlOptions = mapHoodieConfigsToSqlOptions(options)
val targetOptions = sqlOptionKeyToWriteConfigKey.keySet --
Set(SQL_PAYLOAD_CLASS.sqlKeyName) -- Set(SQL_RECORD_MERGER_STRATEGY.sqlKeyName)
-- Set(SQL_PAYLOAD_TYPE.sqlKeyName)
sqlOptions.filterKeys(targetOptions.contains)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
index f0fd5159450..e41ec8846b8 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
@@ -1461,4 +1461,29 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
assertResult(table.storage.outputFormat.get)("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat")
}
}
+
+ test("Test Create Hoodie Table with table configs") {
+ Seq("COPY_ON_WRITE", "MERGE_ON_READ").foreach { tableType =>
+ withTable(generateTableName) { tableName =>
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | tblproperties (
+ | hoodie.table.recordkey.fields ='id',
+ | hoodie.table.type = '$tableType',
+ | hoodie.table.precombine.field = 'ts'
+ | )
+ """.stripMargin)
+ val hoodieCatalogTable = HoodieCatalogTable(spark,
TableIdentifier(tableName))
+ assertResult(Array("id"))(hoodieCatalogTable.primaryKeys)
+ assertResult(tableType)(hoodieCatalogTable.tableTypeName)
+ assertResult("ts")(hoodieCatalogTable.preCombineKey.get)
+ }
+ }
+ }
}