This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2b3a85528a [HUDI-3889] Do not validate table config if save mode is
set to Overwrite (#5619)
2b3a85528a is described below
commit 2b3a85528a32405bad8aee16f19a849322b517b5
Author: xi chaomin <[email protected]>
AuthorDate: Fri Jun 10 07:23:51 2022 +0800
[HUDI-3889] Do not validate table config if save mode is set to Overwrite
(#5619)
Co-authored-by: xicm <[email protected]>
---
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 12 ++---
.../scala/org/apache/hudi/HoodieWriterUtils.scala | 63 ++++++++++++----------
.../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 23 ++++++++
3 files changed, 65 insertions(+), 33 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 84280559e9..fe4391c0a5 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -83,9 +83,9 @@ object HoodieSparkSqlWriter {
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
tableExists = fs.exists(new Path(basePath,
HoodieTableMetaClient.METAFOLDER_NAME))
var tableConfig = getHoodieTableConfig(sparkContext, path,
hoodieTableConfigOpt)
- validateTableConfig(sqlContext.sparkSession, optParams, tableConfig)
+ validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode
== SaveMode.Overwrite)
- val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams,
tableConfig)
+ val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams,
tableConfig, mode)
val originKeyGeneratorClassName =
HoodieWriterUtils.getOriginKeyGenerator(parameters)
val timestampKeyGeneratorConfigs =
extractConfigsRelatedToTimestampBasedKeyGenerator(
originKeyGeneratorClassName, parameters)
@@ -408,9 +408,9 @@ object HoodieSparkSqlWriter {
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
tableExists = fs.exists(new Path(basePath,
HoodieTableMetaClient.METAFOLDER_NAME))
val tableConfig = getHoodieTableConfig(sparkContext, path,
hoodieTableConfigOpt)
- validateTableConfig(sqlContext.sparkSession, optParams, tableConfig)
+ validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode
== SaveMode.Overwrite)
- val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams,
tableConfig)
+ val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams,
tableConfig, mode)
val tableName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME,
s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.")
val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE)
val bootstrapBasePath = hoodieConfig.getStringOrThrow(BASE_PATH,
@@ -734,14 +734,14 @@ object HoodieSparkSqlWriter {
}
private def mergeParamsAndGetHoodieConfig(optParams: Map[String, String],
- tableConfig: HoodieTableConfig): (Map[String, String], HoodieConfig) = {
+ tableConfig: HoodieTableConfig, mode: SaveMode): (Map[String, String],
HoodieConfig) = {
val translatedOptions =
DataSourceWriteOptions.translateSqlOptions(optParams)
val mergedParams = mutable.Map.empty ++
HoodieWriterUtils.parametersWithWriteDefaults(translatedOptions)
if (!mergedParams.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)
&& mergedParams.contains(KEYGENERATOR_CLASS_NAME.key)) {
mergedParams(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) =
mergedParams(KEYGENERATOR_CLASS_NAME.key)
}
- if (null != tableConfig) {
+ if (null != tableConfig && mode != SaveMode.Overwrite) {
tableConfig.getProps.foreach { case (key, value) =>
mergedParams(key) = value
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index 63f1a7afc2..4967212675 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -118,47 +118,56 @@ object HoodieWriterUtils {
}
}
+ def validateTableConfig(spark: SparkSession, params: Map[String, String],
+ tableConfig: HoodieConfig): Unit = {
+ validateTableConfig(spark, params, tableConfig, false)
+ }
+
/**
* Detects conflicts between new parameters and existing table configurations
*/
def validateTableConfig(spark: SparkSession, params: Map[String, String],
- tableConfig: HoodieConfig): Unit = {
- val resolver = spark.sessionState.conf.resolver
- val diffConfigs = StringBuilder.newBuilder
- params.foreach { case (key, value) =>
- val existingValue =
getStringFromTableConfigWithAlternatives(tableConfig, key)
- if (null != existingValue && !resolver(existingValue, value)) {
- diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(key)}\n")
+ tableConfig: HoodieConfig, isOverWriteMode: Boolean): Unit = {
+ // If Overwrite is set as save mode, we don't need to do table config
validation.
+ if (!isOverWriteMode) {
+ val resolver = spark.sessionState.conf.resolver
+ val diffConfigs = StringBuilder.newBuilder
+ params.foreach { case (key, value) =>
+ val existingValue =
getStringFromTableConfigWithAlternatives(tableConfig, key)
+ if (null != existingValue && !resolver(existingValue, value)) {
+ diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(key)}\n")
+ }
}
- }
- if (null != tableConfig) {
- val datasourceRecordKey = params.getOrElse(RECORDKEY_FIELD.key(), null)
- val tableConfigRecordKey =
tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS)
- if (null != datasourceRecordKey && null != tableConfigRecordKey
+ if (null != tableConfig) {
+ val datasourceRecordKey = params.getOrElse(RECORDKEY_FIELD.key(), null)
+ val tableConfigRecordKey =
tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS)
+ if (null != datasourceRecordKey && null != tableConfigRecordKey
&& datasourceRecordKey != tableConfigRecordKey) {
-
diffConfigs.append(s"RecordKey:\t$datasourceRecordKey\t$tableConfigRecordKey\n")
- }
+
diffConfigs.append(s"RecordKey:\t$datasourceRecordKey\t$tableConfigRecordKey\n")
+ }
- val datasourcePreCombineKey = params.getOrElse(PRECOMBINE_FIELD.key(),
null)
- val tableConfigPreCombineKey =
tableConfig.getString(HoodieTableConfig.PRECOMBINE_FIELD)
- if (null != datasourcePreCombineKey && null != tableConfigPreCombineKey
+ val datasourcePreCombineKey = params.getOrElse(PRECOMBINE_FIELD.key(),
null)
+ val tableConfigPreCombineKey =
tableConfig.getString(HoodieTableConfig.PRECOMBINE_FIELD)
+ if (null != datasourcePreCombineKey && null != tableConfigPreCombineKey
&& datasourcePreCombineKey != tableConfigPreCombineKey) {
-
diffConfigs.append(s"PreCombineKey:\t$datasourcePreCombineKey\t$tableConfigPreCombineKey\n")
- }
+
diffConfigs.append(s"PreCombineKey:\t$datasourcePreCombineKey\t$tableConfigPreCombineKey\n")
+ }
- val datasourceKeyGen = getOriginKeyGenerator(params)
- val tableConfigKeyGen =
tableConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
- if (null != datasourceKeyGen && null != tableConfigKeyGen
+ val datasourceKeyGen = getOriginKeyGenerator(params)
+ val tableConfigKeyGen =
tableConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
+ if (null != datasourceKeyGen && null != tableConfigKeyGen
&& datasourceKeyGen != tableConfigKeyGen) {
-
diffConfigs.append(s"KeyGenerator:\t$datasourceKeyGen\t$tableConfigKeyGen\n")
+
diffConfigs.append(s"KeyGenerator:\t$datasourceKeyGen\t$tableConfigKeyGen\n")
+ }
}
- }
- if (diffConfigs.nonEmpty) {
- diffConfigs.insert(0, "\nConfig conflict(key\tcurrent value\texisting
value):\n")
- throw new HoodieException(diffConfigs.toString.trim)
+ if (diffConfigs.nonEmpty) {
+ diffConfigs.insert(0, "\nConfig conflict(key\tcurrent value\texisting
value):\n")
+ throw new HoodieException(diffConfigs.toString.trim)
+ }
}
+
// Check schema evolution for bootstrap table.
// now we do not support bootstrap table.
if (params.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 339dbb5c71..928b1b1a1e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -272,6 +272,29 @@ class TestHoodieSparkSqlWriter {
assert(tableAlreadyExistException.getMessage.contains(s"${HoodieWriteConfig.TBL_NAME.key}:\thoodie_bar_tbl\thoodie_foo_tbl"))
}
+ /**
+ * Test case for Do not validate table config if save mode is set to
Overwrite
+ */
+ @Test
+ def testValidateTableConfigWithOverwriteSaveMode(): Unit = {
+ //create a new table
+ val tableModifier1 = Map("path" -> tempBasePath,
HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
+ "hoodie.datasource.write.recordkey.field" -> "uuid")
+ val dataFrame =
spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new
Date().getTime)))
+ HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, tableModifier1,
dataFrame)
+
+ //on same path try write with different RECORDKEY_FIELD_NAME and Append
SaveMode should throw an exception
+ val tableModifier2 = Map("path" -> tempBasePath,
HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
+ "hoodie.datasource.write.recordkey.field" -> "ts")
+ val dataFrame2 =
spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new
Date().getTime)))
+ val hoodieException =
intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext,
SaveMode.Append, tableModifier2, dataFrame2))
+ assert(hoodieException.getMessage.contains("Config conflict"))
+ assert(hoodieException.getMessage.contains(s"RecordKey:\tts\tuuid"))
+
+ //on same path try write with different RECORDKEY_FIELD_NAME and Overwrite
SaveMode should be successful.
+ assert(HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite,
tableModifier2, dataFrame2)._1)
+ }
+
/**
* Test case for each bulk insert sort mode
*