This is an automated email from the ASF dual-hosted git repository.

akudinkin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f0f8d618b30 [HUDI-5499] Fixing Spark SQL configs not being properly 
propagated for CTAS and other commands (#7607)
f0f8d618b30 is described below

commit f0f8d618b30eee73baf02806d915a8b12034edad
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Fri Jan 20 13:53:25 2023 -0800

    [HUDI-5499] Fixing Spark SQL configs not being properly propagated for CTAS 
and other commands (#7607)
    
    ### Change Logs
    
    While following up and adding support for BrooklynData Benchmarks we've 
discovered that CTAS isn't properly propagating configs due to a recent change 
in 
[#5178](https://github.com/apache/hudi/pull/5178/files#diff-560283e494c8ba8da102fc217a2201220dd4db731ec23d80884e0f001a7cc0bcR117)
    
    Unfortunately logic of handling the configuration in `ProvidesHoodieConfig` 
become overly complicated and fragmented.
    
    This PR takes a stab at it trying to unify and streamline fusing the 
options from different sources (Spark Catalog props, Table properties, Spark 
SQL conf, overrides, etc) making sure different Spark SQL operations do handle 
it in much the same way (for ex, `MERGE INTO`, CTAS, `INSERT INTO`, etc)
    
    Changes
    
     - Simplify and unify `ProvidesHoodieConfig` configuration fusion from 
different sources
     - Fixing CTAS to override "hoodie.combine.before.insert" as "false"
---
 .../apache/spark/sql/hudi/HoodieOptionConfig.scala |  12 --
 .../spark/sql/hudi/ProvidesHoodieConfig.scala      | 128 +++++++++++----------
 .../command/CreateHoodieTableAsSelectCommand.scala |   7 +-
 .../hudi/command/MergeIntoHoodieTableCommand.scala |  16 +--
 .../command/procedures/HiveSyncProcedure.scala     |   3 +-
 .../spark/sql/hudi/HoodieSparkSqlTestBase.scala    |  26 ++++-
 .../apache/spark/sql/hudi/TestCreateTable.scala    |  20 +++-
 .../apache/spark/sql/hudi/TestInsertTable.scala    |   9 ++
 8 files changed, 131 insertions(+), 90 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
index 7da69b2c0bb..bb682cf9b5f 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
@@ -162,18 +162,6 @@ object HoodieOptionConfig {
       .toMap
   }
 
-  /**
-   * Get the primary key from the table options.
-   * @param options
-   * @return
-   */
-  def getPrimaryColumns(options: Map[String, String]): Array[String] = {
-    val params = mapSqlOptionsToDataSourceWriteConfigs(options)
-    params.get(DataSourceWriteOptions.RECORDKEY_FIELD.key)
-      .map(_.split(",").filter(_.nonEmpty))
-      .getOrElse(Array.empty)
-  }
-
   /**
    * Get the table type from the table options.
    * @param options
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index bf6b6509b95..77ff939cf26 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -17,8 +17,10 @@
 
 package org.apache.spark.sql.hudi
 
+import org.apache.hudi.DataSourceWriteOptions
 import org.apache.hudi.DataSourceWriteOptions._
-import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.HoodieConversionUtils.toProperties
+import org.apache.hudi.common.config.{DFSPropertiesConfiguration, 
TypedProperties}
 import org.apache.hudi.common.model.{OverwriteWithLatestAvroPayload, 
WriteOperationType}
 import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
@@ -28,12 +30,13 @@ import org.apache.hudi.hive.{HiveSyncConfig, 
HiveSyncConfigHolder, MultiPartKeys
 import org.apache.hudi.keygen.ComplexKeyGenerator
 import org.apache.hudi.sql.InsertMode
 import org.apache.hudi.sync.common.HoodieSyncConfig
-import org.apache.hudi.{DataSourceWriteOptions, HoodieWriterUtils}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
 import org.apache.spark.sql.hive.HiveExternalCatalog
-import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isUsingHiveCatalog, 
withSparkConf}
+import 
org.apache.spark.sql.hudi.HoodieOptionConfig.mapSqlOptionsToDataSourceWriteConfigs
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isHoodieConfigKey, 
isUsingHiveCatalog}
+import org.apache.spark.sql.hudi.ProvidesHoodieConfig.{combineOptions, 
withCombinedOptions}
 import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, 
ValidateDuplicateKeyPayload}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
@@ -45,7 +48,6 @@ trait ProvidesHoodieConfig extends Logging {
 
   def buildHoodieConfig(hoodieCatalogTable: HoodieCatalogTable): Map[String, 
String] = {
     val sparkSession: SparkSession = hoodieCatalogTable.spark
-    val catalogProperties = hoodieCatalogTable.catalogProperties
     val tableConfig = hoodieCatalogTable.tableConfig
 
     // NOTE: Here we fallback to "" to make sure that null value is not 
overridden with
@@ -56,17 +58,14 @@ trait ProvidesHoodieConfig extends Logging {
     require(hoodieCatalogTable.primaryKeys.nonEmpty,
       s"There are no primary key in table 
${hoodieCatalogTable.table.identifier}, cannot execute update operator")
 
-    val hoodieProps = getHoodieProps(catalogProperties, tableConfig, 
sparkSession.sqlContext.conf)
+    val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, 
tableConfig)
 
-    val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)
-
-    withSparkConf(sparkSession, catalogProperties) {
+    withCombinedOptions(hoodieCatalogTable, tableConfig, 
sparkSession.sqlContext.conf) {
       Map.apply(
         "path" -> hoodieCatalogTable.tableLocation,
         RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
         TBL_NAME.key -> hoodieCatalogTable.tableName,
         PRECOMBINE_FIELD.key -> preCombineField,
-        RECORD_MERGER_IMPLS.key -> 
hoodieProps.getString(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, 
HoodieWriteConfig.RECORD_MERGER_IMPLS.defaultValue),
         HIVE_STYLE_PARTITIONING.key -> 
tableConfig.getHiveStylePartitioningEnable,
         URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
         KEYGENERATOR_CLASS_NAME.key -> 
classOf[SqlKeyGenerator].getCanonicalName,
@@ -81,10 +80,8 @@ trait ProvidesHoodieConfig extends Logging {
         HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> 
tableConfig.getPartitionFieldProp,
         HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> 
hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS),
         HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> 
hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
-        HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> 
hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"),
         SqlKeyGenerator.PARTITION_SCHEMA -> 
hoodieCatalogTable.partitionSchema.toDDL
       )
-        .filter { case(_, v) => v != null }
     }
   }
 
@@ -109,12 +106,9 @@ trait ProvidesHoodieConfig extends Logging {
     val path = hoodieCatalogTable.tableLocation
     val tableType = hoodieCatalogTable.tableTypeName
     val tableConfig = hoodieCatalogTable.tableConfig
-    val catalogProperties = hoodieCatalogTable.catalogProperties
-
-    val hoodieProps = getHoodieProps(catalogProperties, tableConfig, 
sparkSession.sqlContext.conf, extraOptions)
-    val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)
 
-    val parameters = withSparkConf(sparkSession, catalogProperties)()
+    val combinedOpts = combineOptions(hoodieCatalogTable, tableConfig, 
sparkSession.sqlContext.conf, extraOptions)
+    val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, 
tableConfig, extraOptions)
 
     val partitionFieldsStr = hoodieCatalogTable.partitionFields.mkString(",")
 
@@ -128,18 +122,21 @@ trait ProvidesHoodieConfig extends Logging {
     val keyGeneratorClassName = Option(tableConfig.getKeyGeneratorClassName)
       .getOrElse(classOf[ComplexKeyGenerator].getCanonicalName)
 
-    val enableBulkInsert = 
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
-      DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean 
||
-      
parameters.get(DataSourceWriteOptions.OPERATION.key).exists(_.equalsIgnoreCase(WriteOperationType.BULK_INSERT.value))
+    val enableBulkInsert = 
combinedOpts.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+      DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
     val dropDuplicate = sparkSession.conf
       
.getOption(INSERT_DROP_DUPS.key).getOrElse(INSERT_DROP_DUPS.defaultValue).toBoolean
 
-    val insertMode = 
InsertMode.of(parameters.getOrElse(DataSourceWriteOptions.SQL_INSERT_MODE.key,
+    val insertMode = 
InsertMode.of(combinedOpts.getOrElse(DataSourceWriteOptions.SQL_INSERT_MODE.key,
       DataSourceWriteOptions.SQL_INSERT_MODE.defaultValue()))
     val isNonStrictMode = insertMode == InsertMode.NON_STRICT
     val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty
     val hasPrecombineColumn = hoodieCatalogTable.preCombineKey.nonEmpty
-    val operation =
+
+    // NOTE: Target operation could be overridden by the user, therefore if it 
has been provided as an input
+    //       we'd prefer that value over auto-deduced operation. Otherwise, we 
deduce target operation type
+    val operationOverride = 
combinedOpts.get(DataSourceWriteOptions.OPERATION.key)
+    val operation = operationOverride.getOrElse {
       (enableBulkInsert, isOverwritePartition, isOverwriteTable, 
dropDuplicate, isNonStrictMode, isPartitionedTable) match {
         case (true, _, _, _, false, _) =>
           throw new IllegalArgumentException(s"Table with primaryKey can not 
use bulk insert in ${insertMode.value()} mode.")
@@ -161,6 +158,7 @@ trait ProvidesHoodieConfig extends Logging {
         // for the rest case, use the insert operation
         case _ => INSERT_OPERATION_OPT_VAL
       }
+    }
 
     val payloadClassName = if (operation == UPSERT_OPERATION_OPT_VAL &&
       tableType == COW_TABLE_TYPE_OPT_VAL && insertMode == InsertMode.STRICT) {
@@ -176,10 +174,7 @@ trait ProvidesHoodieConfig extends Logging {
       classOf[OverwriteWithLatestAvroPayload].getCanonicalName
     }
 
-
-    logInfo(s"Insert statement use write operation type: $operation, 
payloadClass: $payloadClassName")
-
-    withSparkConf(sparkSession, catalogProperties) {
+    withCombinedOptions(hoodieCatalogTable, tableConfig, 
sparkSession.sqlContext.conf) {
       Map(
         "path" -> path,
         TABLE_TYPE.key -> tableType,
@@ -193,8 +188,6 @@ trait ProvidesHoodieConfig extends Logging {
         PRECOMBINE_FIELD.key -> preCombineField,
         PARTITIONPATH_FIELD.key -> partitionFieldsStr,
         PAYLOAD_CLASS_NAME.key -> payloadClassName,
-        RECORD_MERGER_IMPLS.key -> 
hoodieProps.getString(HoodieWriteConfig.RECORD_MERGER_IMPLS.key, 
HoodieWriteConfig.RECORD_MERGER_IMPLS.defaultValue),
-        ENABLE_ROW_WRITER.key -> enableBulkInsert.toString,
         HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> 
String.valueOf(hasPrecombineColumn),
         HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFieldsStr,
         HoodieSyncConfig.META_SYNC_ENABLED.key -> 
hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
@@ -204,26 +197,20 @@ trait ProvidesHoodieConfig extends Logging {
         HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> 
hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
         HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> 
hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
         HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> 
hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS),
-        HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> 
hoodieProps.getString(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "200"),
-        HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> 
hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"),
         SqlKeyGenerator.PARTITION_SCHEMA -> 
hoodieCatalogTable.partitionSchema.toDDL
       )
-        .filter { case (_, v) => v != null }
     }
   }
 
-  def buildHoodieDropPartitionsConfig(
-                                 sparkSession: SparkSession,
-                                 hoodieCatalogTable: HoodieCatalogTable,
-                                 partitionsToDrop: String): Map[String, 
String] = {
+  def buildHoodieDropPartitionsConfig(sparkSession: SparkSession,
+                                      hoodieCatalogTable: HoodieCatalogTable,
+                                      partitionsToDrop: String): Map[String, 
String] = {
     val partitionFields = hoodieCatalogTable.partitionFields.mkString(",")
-    val catalogProperties = hoodieCatalogTable.catalogProperties
     val tableConfig = hoodieCatalogTable.tableConfig
 
-    val hoodieProps = getHoodieProps(catalogProperties, tableConfig, 
sparkSession.sqlContext.conf)
-    val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)
+    val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, 
tableConfig)
 
-    withSparkConf(sparkSession, catalogProperties) {
+    withCombinedOptions(hoodieCatalogTable, tableConfig, 
sparkSession.sqlContext.conf) {
       Map(
         "path" -> hoodieCatalogTable.tableLocation,
         TBL_NAME.key -> hoodieCatalogTable.tableName,
@@ -242,14 +229,12 @@ trait ProvidesHoodieConfig extends Logging {
         HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFields,
         HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> 
hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS)
       )
-        .filter { case (_, v) => v != null }
     }
   }
 
   def buildHoodieDeleteTableConfig(hoodieCatalogTable: HoodieCatalogTable,
                                    sparkSession: SparkSession): Map[String, 
String] = {
     val path = hoodieCatalogTable.tableLocation
-    val catalogProperties = hoodieCatalogTable.catalogProperties
     val tableConfig = hoodieCatalogTable.tableConfig
     val tableSchema = hoodieCatalogTable.tableSchema
     val partitionColumns = 
tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase(Locale.ROOT))
@@ -258,14 +243,9 @@ trait ProvidesHoodieConfig extends Logging {
     assert(hoodieCatalogTable.primaryKeys.nonEmpty,
       s"There are no primary key defined in table 
${hoodieCatalogTable.table.identifier}, cannot execute delete operation")
 
-    val hoodieProps = getHoodieProps(catalogProperties, tableConfig, 
sparkSession.sqlContext.conf)
-    val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)
+    val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, 
tableConfig)
 
-    val options = hoodieCatalogTable.catalogProperties
-    val enableHive = isUsingHiveCatalog(sparkSession)
-    val partitionFields = hoodieCatalogTable.partitionFields.mkString(",")
-
-    withSparkConf(sparkSession, options) {
+    withCombinedOptions(hoodieCatalogTable, tableConfig, 
sparkSession.sqlContext.conf) {
       Map(
         "path" -> path,
         RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
@@ -282,24 +262,20 @@ trait ProvidesHoodieConfig extends Logging {
         HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> 
hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
         HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> 
hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
         HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> 
hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
-        HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFields,
+        HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> 
hoodieCatalogTable.partitionFields.mkString(","),
         HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> 
hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS),
-        HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> 
hoodieProps.getString(HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key, "200"),
         SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL
       )
     }
   }
 
-  def getHoodieProps(catalogProperties: Map[String, String], tableConfig: 
HoodieTableConfig, conf: SQLConf, extraOptions: Map[String, String] = 
Map.empty): TypedProperties = {
-    val options: Map[String, String] = catalogProperties ++ 
tableConfig.getProps.asScala.toMap ++ conf.getAllConfs ++ extraOptions
-    val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(options)
-    hoodieConfig.getProps
-  }
+  def buildHiveSyncConfig(sparkSession: SparkSession,
+                          hoodieCatalogTable: HoodieCatalogTable,
+                          tableConfig: HoodieTableConfig,
+                          extraOptions: Map[String, String] = Map.empty): 
HiveSyncConfig = {
+    val combinedOpts = combineOptions(hoodieCatalogTable, tableConfig, 
sparkSession.sqlContext.conf, extraOptions)
+    val props = new TypedProperties(toProperties(combinedOpts))
 
-  def buildHiveSyncConfig(
-     props: TypedProperties,
-     hoodieCatalogTable: HoodieCatalogTable,
-     sparkSession: SparkSession = SparkSession.active): HiveSyncConfig = {
     // Enable the hive sync by default if spark have enable the hive metastore.
     val enableHive = isUsingHiveCatalog(sparkSession)
     val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig(props)
@@ -325,3 +301,39 @@ trait ProvidesHoodieConfig extends Logging {
     hiveSyncConfig
   }
 }
+
+object ProvidesHoodieConfig {
+
+  def filterNullValues(opts: Map[String, String]): Map[String, String] =
+    opts.filter { case (_, v) => v != null }
+
+  def withCombinedOptions(catalogTable: HoodieCatalogTable,
+                          tableConfig: HoodieTableConfig,
+                          sqlConf: SQLConf)(optionOverrides: Map[String, 
String] = Map.empty): Map[String, String] = {
+    combineOptions(catalogTable, tableConfig, sqlConf, optionOverrides)
+  }
+
+  private def combineOptions(catalogTable: HoodieCatalogTable,
+                             tableConfig: HoodieTableConfig,
+                             sqlConf: SQLConf,
+                             optionOverrides: Map[String, String] = 
Map.empty): Map[String, String] = {
+    // NOTE: Properties are merged in the following order of priority (first 
has the highest priority, last has the
+    //       lowest, which is inverse to the ordering in the code):
+    //          1. (Extra) Option overrides
+    //          2. Spark SQL configs
+    //          3. Persisted Hudi's Table configs
+    //          4. Table's properties in Spark Catalog
+    //          5. Global DFS properties
+    DFSPropertiesConfiguration.getGlobalProps.asScala.toMap ++
+      // NOTE: Catalog table provided t/h `TBLPROPERTIES` clause might contain 
Spark SQL specific
+      //       properties that need to be mapped into Hudi's conventional ones
+      mapSqlOptionsToDataSourceWriteConfigs(catalogTable.catalogProperties) ++
+      tableConfig.getProps.asScala.toMap ++
+      filterHoodieConfigs(sqlConf.getAllConfs) ++
+      filterNullValues(optionOverrides)
+  }
+
+  private def filterHoodieConfigs(opts: Map[String, String]): Map[String, 
String] =
+    opts.filterKeys(isHoodieConfigKey)
+
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
index 1f8d0095301..0e19514e28c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hudi.DataSourceWriteOptions
 import org.apache.hudi.common.util.ValidationUtils.checkState
+import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.hive.HiveSyncConfigHolder
 import org.apache.hudi.sql.InsertMode
 import org.apache.hudi.sync.common.util.ConfigUtils
@@ -92,13 +93,11 @@ case class CreateHoodieTableAsSelectCommand(
       hoodieCatalogTable.initHoodieTable()
 
       val tableProperties = hoodieCatalogTable.catalogProperties
-      // NOTE: Users might be specifying write-configuration (inadvertently) 
as options or table properties
-      //       in CTAS, therefore we need to make sure that these are 
appropriately propagated to the
-      //       write operation
-      val options = tableProperties ++ Map(
+      val options = Map(
         HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE.key -> (table.tableType 
== CatalogTableType.MANAGED).toString,
         HiveSyncConfigHolder.HIVE_TABLE_SERDE_PROPERTIES.key -> 
ConfigUtils.configToString(tableProperties.asJava),
         HiveSyncConfigHolder.HIVE_TABLE_PROPERTIES.key -> 
ConfigUtils.configToString(updatedTable.properties.asJava),
+        HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> "false",
         DataSourceWriteOptions.SQL_INSERT_MODE.key -> 
InsertMode.NON_STRICT.value(),
         DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key -> "true"
       )
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 7befb97c7b8..9099c7225e0 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -20,11 +20,10 @@ package org.apache.spark.sql.hudi.command
 import org.apache.avro.Schema
 import org.apache.hudi.AvroConversionUtils.convertStructTypeToAvroSchema
 import org.apache.hudi.DataSourceWriteOptions._
-import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.common.model.HoodieAvroRecordMerger
+import org.apache.hudi.common.util.StringUtils
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE, 
TBL_NAME}
-import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.hive.HiveSyncConfigHolder
 import org.apache.hudi.sync.common.HoodieSyncConfig
@@ -38,6 +37,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, 
Attribute, AttributeRef
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
 import org.apache.spark.sql.hudi.HoodieSqlUtils.getMergeIntoTargetTableId
+import org.apache.spark.sql.hudi.ProvidesHoodieConfig.withCombinedOptions
 import 
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.CoercedAttributeReference
 import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
 import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
@@ -491,8 +491,6 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     val targetTableDb = targetTableIdentify.database.getOrElse("default")
     val targetTableName = targetTableIdentify.identifier
     val path = hoodieCatalogTable.tableLocation
-    // force to use ExpressionPayload as WRITE_PAYLOAD_CLASS_NAME in 
MergeIntoHoodieTableCommand
-    val catalogProperties = hoodieCatalogTable.catalogProperties + 
(PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName)
     val tableConfig = hoodieCatalogTable.tableConfig
     val tableSchema = hoodieCatalogTable.tableSchema
     val partitionColumns = 
tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase)
@@ -503,10 +501,9 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
     // TODO(HUDI-3456) clean up
     val preCombineField = hoodieCatalogTable.preCombineKey.getOrElse("")
 
-    val hoodieProps = getHoodieProps(catalogProperties, tableConfig, 
sparkSession.sqlContext.conf)
-    val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)
+    val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, 
tableConfig)
 
-    withSparkConf(sparkSession, catalogProperties) {
+    withCombinedOptions(hoodieCatalogTable, tableConfig, 
sparkSession.sqlContext.conf) {
       Map(
         "path" -> path,
         RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp,
@@ -525,10 +522,8 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
         HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> 
hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
         HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> 
tableConfig.getPartitionFieldProp,
         HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> 
hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS),
-        HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key -> 
hoodieProps.getString(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "200"), 
// set the default parallelism to 200 for sql
-        HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key -> 
hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"),
-        HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key -> 
hoodieProps.getString(HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key, "200"),
         SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL,
+        PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName,
 
         // NOTE: We have to explicitly override following configs to make sure 
no schema validation is performed
         //       as schema of the incoming dataset might be diverging from the 
table's schema (full schemas'
@@ -539,7 +534,6 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
         RECONCILE_SCHEMA.key -> "false",
         "hoodie.datasource.write.schema.canonicalize" -> "false"
       )
-        .filter { case (_, v) => v != null }
     }
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HiveSyncProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HiveSyncProcedure.scala
index f237a1dcbb4..513f40a4c8c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HiveSyncProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HiveSyncProcedure.scala
@@ -84,8 +84,7 @@ class HiveSyncProcedure extends BaseProcedure with 
ProcedureBuilder
     hiveConf.addResource(hadoopConf)
 
     val tableConfig = hoodieCatalogTable.tableConfig
-    val hoodieProps = getHoodieProps(hoodieCatalogTable.catalogProperties, 
tableConfig, sqlConf)
-    val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)
+    val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, 
tableConfig)
 
     var hiveSyncTool: HiveSyncTool = null
     try {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
index cf37f7436fd..6beed5cfc39 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
@@ -23,11 +23,14 @@ import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.config.HoodieStorageConfig
 import org.apache.hudi.common.model.HoodieAvroRecordMerger
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
+import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.ExceptionUtil.getRootCause
 import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex
 import org.apache.log4j.Level
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.checkMessageContains
 import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.util.Utils
 import org.joda.time.DateTimeZone
@@ -144,8 +147,11 @@ class HoodieSparkSqlTestBase extends FunSuite with 
BeforeAndAfterAll {
     try {
       spark.sql(sql)
     } catch {
-      case e: Throwable if e.getMessage.trim.contains(errorMsg.trim) => 
hasException = true
-      case f: Throwable => fail("Exception should contain: " + errorMsg + ", 
error message: " + f.getMessage, f)
+      case e: Throwable if checkMessageContains(e, errorMsg) || 
checkMessageContains(getRootCause(e), errorMsg) =>
+        hasException = true
+
+      case f: Throwable =>
+        fail("Exception should contain: " + errorMsg + ", error message: " + 
f.getMessage, f)
     }
     assertResult(true)(hasException)
   }
@@ -219,3 +225,19 @@ class HoodieSparkSqlTestBase extends FunSuite with 
BeforeAndAfterAll {
     }
   }
 }
+
+object HoodieSparkSqlTestBase {
+
+  def getLastCommitMetadata(spark: SparkSession, tablePath: String) = {
+    val metaClient = HoodieTableMetaClient.builder()
+      .setConf(spark.sparkContext.hadoopConfiguration)
+      .setBasePath(tablePath)
+      .build()
+
+    
metaClient.getActiveTimeline.getLastCommitMetadataWithValidData.get.getRight
+  }
+
+  private def checkMessageContains(e: Throwable, text: String): Boolean =
+    e.getMessage.trim.contains(text.trim)
+
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
index 9fc910c4f1f..120c6adb6cd 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi
 
 import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.HoodieSparkUtils
-import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.util.PartitionPathEncodeUtils.escapePathName
 import org.apache.hudi.config.HoodieWriteConfig
@@ -28,6 +28,7 @@ import org.apache.hudi.keygen.{ComplexKeyGenerator, 
NonpartitionedKeyGenerator,
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.getLastCommitMetadata
 import org.apache.spark.sql.types._
 import org.junit.jupiter.api.Assertions.assertFalse
 
@@ -299,6 +300,10 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
              | AS
              | select 1 as id, 'a1' as name, 10 as price, 1000 as ts
        """.stripMargin)
+
+        assertResult(WriteOperationType.BULK_INSERT) {
+          getLastCommitMetadata(spark, 
s"${tmp.getCanonicalPath}/$tableName1").getOperationType
+        }
         checkAnswer(s"select id, name, price, ts from $tableName1")(
           Seq(1, "a1", 10.0, 1000)
         )
@@ -318,6 +323,10 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
              | select 1 as id, 'a1' as name, 10 as price, '2021-04-01' as dt
          """.stripMargin
         )
+
+        assertResult(WriteOperationType.BULK_INSERT) {
+          getLastCommitMetadata(spark, 
s"${tmp.getCanonicalPath}/$tableName2").getOperationType
+        }
         checkAnswer(s"select id, name, price, dt from $tableName2")(
           Seq(1, "a1", 10, "2021-04-01")
         )
@@ -356,9 +365,14 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
              | price
          """.stripMargin
         )
+
+        assertResult(WriteOperationType.BULK_INSERT) {
+          getLastCommitMetadata(spark, 
s"${tmp.getCanonicalPath}/$tableName3").getOperationType
+        }
         checkAnswer(s"select id, name, price, cast(dt as string) from 
$tableName3")(
           Seq(1, "a1", 10, "2021-05-06 00:00:00")
         )
+
         // Create table with date type partition
         val tableName4 = generateTableName
         spark.sql(
@@ -375,6 +389,10 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
              | price
          """.stripMargin
         )
+
+        assertResult(WriteOperationType.BULK_INSERT) {
+          getLastCommitMetadata(spark, 
s"${tmp.getCanonicalPath}/$tableName4").getOperationType
+        }
         checkAnswer(s"select id, name, price, cast(dt as string) from 
$tableName4")(
           Seq(1, "a1", 10, "2021-05-06")
         )
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index 868a96ebc2c..a227a20b8b6 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -22,11 +22,13 @@ import org.apache.hudi.DataSourceWriteOptions._
 import org.apache.hudi.HoodieSparkUtils
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
+import org.apache.hudi.common.model.WriteOperationType
 import 
org.apache.spark.sql.hudi.command.HoodieSparkValidateDuplicateKeyRecordMerger
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.exception.HoodieDuplicateKeyException
 import org.apache.hudi.keygen.ComplexKeyGenerator
 import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.getLastCommitMetadata
 
 import java.io.File
 
@@ -725,13 +727,20 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
           spark.sql("set hoodie.sql.bulk.insert.enable = true")
           spark.sql(s"insert into $tableName values(1, 'a1', 10, 
'2021-07-18')")
 
+          assertResult(WriteOperationType.BULK_INSERT) {
+            getLastCommitMetadata(spark, 
s"${tmp.getCanonicalPath}/$tableName").getOperationType
+          }
           checkAnswer(s"select id, name, price, dt from $tableName")(
             Seq(1, "a1", 10.0, "2021-07-18")
           )
+
           // Disable the bulk insert
           spark.sql("set hoodie.sql.bulk.insert.enable = false")
           spark.sql(s"insert into $tableName values(2, 'a2', 10, 
'2021-07-18')")
 
+          assertResult(WriteOperationType.INSERT) {
+            getLastCommitMetadata(spark, 
s"${tmp.getCanonicalPath}/$tableName").getOperationType
+          }
           checkAnswer(s"select id, name, price, dt from $tableName order by 
id")(
             Seq(1, "a1", 10.0, "2021-07-18"),
             Seq(2, "a2", 10.0, "2021-07-18")


Reply via email to