This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b3a85528a [HUDI-3889] Do not validate table config if save mode is 
set to Overwrite (#5619)
2b3a85528a is described below

commit 2b3a85528a32405bad8aee16f19a849322b517b5
Author: xi chaomin <[email protected]>
AuthorDate: Fri Jun 10 07:23:51 2022 +0800

    [HUDI-3889] Do not validate table config if save mode is set to Overwrite 
(#5619)
    
    
    Co-authored-by: xicm <[email protected]>
---
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 12 ++---
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  | 63 ++++++++++++----------
 .../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 23 ++++++++
 3 files changed, 65 insertions(+), 33 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 84280559e9..fe4391c0a5 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -83,9 +83,9 @@ object HoodieSparkSqlWriter {
     val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
     tableExists = fs.exists(new Path(basePath, 
HoodieTableMetaClient.METAFOLDER_NAME))
     var tableConfig = getHoodieTableConfig(sparkContext, path, 
hoodieTableConfigOpt)
-    validateTableConfig(sqlContext.sparkSession, optParams, tableConfig)
+    validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode 
== SaveMode.Overwrite)
 
-    val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, 
tableConfig)
+    val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, 
tableConfig, mode)
     val originKeyGeneratorClassName = 
HoodieWriterUtils.getOriginKeyGenerator(parameters)
     val timestampKeyGeneratorConfigs = 
extractConfigsRelatedToTimestampBasedKeyGenerator(
       originKeyGeneratorClassName, parameters)
@@ -408,9 +408,9 @@ object HoodieSparkSqlWriter {
     val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
     tableExists = fs.exists(new Path(basePath, 
HoodieTableMetaClient.METAFOLDER_NAME))
     val tableConfig = getHoodieTableConfig(sparkContext, path, 
hoodieTableConfigOpt)
-    validateTableConfig(sqlContext.sparkSession, optParams, tableConfig)
+    validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode 
== SaveMode.Overwrite)
 
-    val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, 
tableConfig)
+    val (parameters, hoodieConfig) = mergeParamsAndGetHoodieConfig(optParams, 
tableConfig, mode)
     val tableName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, 
s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.")
     val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE)
     val bootstrapBasePath = hoodieConfig.getStringOrThrow(BASE_PATH,
@@ -734,14 +734,14 @@ object HoodieSparkSqlWriter {
   }
 
   private def mergeParamsAndGetHoodieConfig(optParams: Map[String, String],
-      tableConfig: HoodieTableConfig): (Map[String, String], HoodieConfig) = {
+      tableConfig: HoodieTableConfig, mode: SaveMode): (Map[String, String], 
HoodieConfig) = {
     val translatedOptions = 
DataSourceWriteOptions.translateSqlOptions(optParams)
     val mergedParams = mutable.Map.empty ++ 
HoodieWriterUtils.parametersWithWriteDefaults(translatedOptions)
     if (!mergedParams.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)
       && mergedParams.contains(KEYGENERATOR_CLASS_NAME.key)) {
       mergedParams(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key) = 
mergedParams(KEYGENERATOR_CLASS_NAME.key)
     }
-    if (null != tableConfig) {
+    if (null != tableConfig && mode != SaveMode.Overwrite) {
       tableConfig.getProps.foreach { case (key, value) =>
         mergedParams(key) = value
       }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index 63f1a7afc2..4967212675 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -118,47 +118,56 @@ object HoodieWriterUtils {
     }
   }
 
+  def validateTableConfig(spark: SparkSession, params: Map[String, String],
+                          tableConfig: HoodieConfig): Unit = {
+    validateTableConfig(spark, params, tableConfig, false)
+  }
+
   /**
    * Detects conflicts between new parameters and existing table configurations
    */
   def validateTableConfig(spark: SparkSession, params: Map[String, String],
-      tableConfig: HoodieConfig): Unit = {
-    val resolver = spark.sessionState.conf.resolver
-    val diffConfigs = StringBuilder.newBuilder
-    params.foreach { case (key, value) =>
-      val existingValue = 
getStringFromTableConfigWithAlternatives(tableConfig, key)
-      if (null != existingValue && !resolver(existingValue, value)) {
-        diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(key)}\n")
+      tableConfig: HoodieConfig, isOverWriteMode: Boolean): Unit = {
+    // If Overwrite is set as save mode, we don't need to do table config 
validation.
+    if (!isOverWriteMode) {
+      val resolver = spark.sessionState.conf.resolver
+      val diffConfigs = StringBuilder.newBuilder
+      params.foreach { case (key, value) =>
+        val existingValue = 
getStringFromTableConfigWithAlternatives(tableConfig, key)
+        if (null != existingValue && !resolver(existingValue, value)) {
+          diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(key)}\n")
+        }
       }
-    }
 
-    if (null != tableConfig) {
-      val datasourceRecordKey = params.getOrElse(RECORDKEY_FIELD.key(), null)
-      val tableConfigRecordKey = 
tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS)
-      if (null != datasourceRecordKey && null != tableConfigRecordKey
+      if (null != tableConfig) {
+        val datasourceRecordKey = params.getOrElse(RECORDKEY_FIELD.key(), null)
+        val tableConfigRecordKey = 
tableConfig.getString(HoodieTableConfig.RECORDKEY_FIELDS)
+        if (null != datasourceRecordKey && null != tableConfigRecordKey
           && datasourceRecordKey != tableConfigRecordKey) {
-        
diffConfigs.append(s"RecordKey:\t$datasourceRecordKey\t$tableConfigRecordKey\n")
-      }
+          
diffConfigs.append(s"RecordKey:\t$datasourceRecordKey\t$tableConfigRecordKey\n")
+        }
 
-      val datasourcePreCombineKey = params.getOrElse(PRECOMBINE_FIELD.key(), 
null)
-      val tableConfigPreCombineKey = 
tableConfig.getString(HoodieTableConfig.PRECOMBINE_FIELD)
-      if (null != datasourcePreCombineKey && null != tableConfigPreCombineKey
+        val datasourcePreCombineKey = params.getOrElse(PRECOMBINE_FIELD.key(), 
null)
+        val tableConfigPreCombineKey = 
tableConfig.getString(HoodieTableConfig.PRECOMBINE_FIELD)
+        if (null != datasourcePreCombineKey && null != tableConfigPreCombineKey
           && datasourcePreCombineKey != tableConfigPreCombineKey) {
-        
diffConfigs.append(s"PreCombineKey:\t$datasourcePreCombineKey\t$tableConfigPreCombineKey\n")
-      }
+          
diffConfigs.append(s"PreCombineKey:\t$datasourcePreCombineKey\t$tableConfigPreCombineKey\n")
+        }
 
-      val datasourceKeyGen = getOriginKeyGenerator(params)
-      val tableConfigKeyGen = 
tableConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
-      if (null != datasourceKeyGen && null != tableConfigKeyGen
+        val datasourceKeyGen = getOriginKeyGenerator(params)
+        val tableConfigKeyGen = 
tableConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
+        if (null != datasourceKeyGen && null != tableConfigKeyGen
           && datasourceKeyGen != tableConfigKeyGen) {
-        
diffConfigs.append(s"KeyGenerator:\t$datasourceKeyGen\t$tableConfigKeyGen\n")
+          
diffConfigs.append(s"KeyGenerator:\t$datasourceKeyGen\t$tableConfigKeyGen\n")
+        }
       }
-    }
 
-    if (diffConfigs.nonEmpty) {
-      diffConfigs.insert(0, "\nConfig conflict(key\tcurrent value\texisting 
value):\n")
-      throw new HoodieException(diffConfigs.toString.trim)
+      if (diffConfigs.nonEmpty) {
+        diffConfigs.insert(0, "\nConfig conflict(key\tcurrent value\texisting 
value):\n")
+        throw new HoodieException(diffConfigs.toString.trim)
+      }
     }
+
     // Check schema evolution for bootstrap table.
     // now we do not support bootstrap table.
     if (params.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index 339dbb5c71..928b1b1a1e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -272,6 +272,29 @@ class TestHoodieSparkSqlWriter {
     
assert(tableAlreadyExistException.getMessage.contains(s"${HoodieWriteConfig.TBL_NAME.key}:\thoodie_bar_tbl\thoodie_foo_tbl"))
   }
 
+  /**
+    * Test case for Do not validate table config if save mode is set to 
Overwrite
+    */
+  @Test
+  def testValidateTableConfigWithOverwriteSaveMode(): Unit = {
+    //create a new table
+    val tableModifier1 = Map("path" -> tempBasePath, 
HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
+      "hoodie.datasource.write.recordkey.field" -> "uuid")
+    val dataFrame = 
spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new 
Date().getTime)))
+    HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, tableModifier1, 
dataFrame)
+
+    //on same path try write with different RECORDKEY_FIELD_NAME and Append 
SaveMode should throw an exception
+    val tableModifier2 = Map("path" -> tempBasePath, 
HoodieWriteConfig.TBL_NAME.key -> hoodieFooTableName,
+      "hoodie.datasource.write.recordkey.field" -> "ts")
+    val dataFrame2 = 
spark.createDataFrame(Seq(StringLongTest(UUID.randomUUID().toString, new 
Date().getTime)))
+    val hoodieException = 
intercept[HoodieException](HoodieSparkSqlWriter.write(sqlContext, 
SaveMode.Append, tableModifier2, dataFrame2))
+    assert(hoodieException.getMessage.contains("Config conflict"))
+    assert(hoodieException.getMessage.contains(s"RecordKey:\tts\tuuid"))
+
+    //on same path try write with different RECORDKEY_FIELD_NAME and Overwrite 
SaveMode should be successful.
+    assert(HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, 
tableModifier2, dataFrame2)._1)
+  }
+
   /**
    * Test case for each bulk insert sort mode
    *

Reply via email to