This is an automated email from the ASF dual-hosted git repository.
akudinkin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f0f8d618b30 [HUDI-5499] Fixing Spark SQL configs not being properly
propagated for CTAS and other commands (#7607)
f0f8d618b30 is described below
commit f0f8d618b30eee73baf02806d915a8b12034edad
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Fri Jan 20 13:53:25 2023 -0800
[HUDI-5499] Fixing Spark SQL configs not being properly propagated for CTAS
and other commands (#7607)
### Change Logs
While following up and adding support for BrooklynData Benchmarks we've
discovered that CTAS isn't properly propagating configs due to a recent change
in
[#5178](https://github.com/apache/hudi/pull/5178/files#diff-560283e494c8ba8da102fc217a2201220dd4db731ec23d80884e0f001a7cc0bcR117)
Unfortunately logic of handling the configuration in `ProvidesHoodieConfig`
become overly complicated and fragmented.
This PR takes a stab at it trying to unify and streamline fusing the
options from different sources (Spark Catalog props, Table properties, Spark
SQL conf, overrides, etc) making sure different Spark SQL operations do handle
it in much the same way (for ex, `MERGE INTO`, CTAS, `INSERT INTO`, etc)
Changes
- Simplify and unify `ProvidesHoodieConfig` configuration fusion from
different sources
- Fixing CTAS to override "hoodie.combine.before.insert" as "false"
---
.../apache/spark/sql/hudi/HoodieOptionConfig.scala | 12 --
.../spark/sql/hudi/ProvidesHoodieConfig.scala | 128 +++++++++++----------
.../command/CreateHoodieTableAsSelectCommand.scala | 7 +-
.../hudi/command/MergeIntoHoodieTableCommand.scala | 16 +--
.../command/procedures/HiveSyncProcedure.scala | 3 +-
.../spark/sql/hudi/HoodieSparkSqlTestBase.scala | 26 ++++-
.../apache/spark/sql/hudi/TestCreateTable.scala | 20 +++-
.../apache/spark/sql/hudi/TestInsertTable.scala | 9 ++
8 files changed, 131 insertions(+), 90 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
index 7da69b2c0bb..bb682cf9b5f 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala
@@ -162,18 +162,6 @@ object HoodieOptionConfig {
.toMap
}
- /**
- * Get the primary key from the table options.
- * @param options
- * @return
- */
- def getPrimaryColumns(options: Map[String, String]): Array[String] = {
- val params = mapSqlOptionsToDataSourceWriteConfigs(options)
- params.get(DataSourceWriteOptions.RECORDKEY_FIELD.key)
- .map(_.split(",").filter(_.nonEmpty))
- .getOrElse(Array.empty)
- }
-
/**
* Get the table type from the table options.
* @param options
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index bf6b6509b95..77ff939cf26 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -17,8 +17,10 @@
package org.apache.spark.sql.hudi
+import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceWriteOptions._
-import org.apache.hudi.common.config.TypedProperties
+import org.apache.hudi.HoodieConversionUtils.toProperties
+import org.apache.hudi.common.config.{DFSPropertiesConfiguration,
TypedProperties}
import org.apache.hudi.common.model.{OverwriteWithLatestAvroPayload,
WriteOperationType}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
@@ -28,12 +30,13 @@ import org.apache.hudi.hive.{HiveSyncConfig,
HiveSyncConfigHolder, MultiPartKeys
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.hudi.sql.InsertMode
import org.apache.hudi.sync.common.HoodieSyncConfig
-import org.apache.hudi.{DataSourceWriteOptions, HoodieWriterUtils}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.hive.HiveExternalCatalog
-import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isUsingHiveCatalog,
withSparkConf}
+import
org.apache.spark.sql.hudi.HoodieOptionConfig.mapSqlOptionsToDataSourceWriteConfigs
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isHoodieConfigKey,
isUsingHiveCatalog}
+import org.apache.spark.sql.hudi.ProvidesHoodieConfig.{combineOptions,
withCombinedOptions}
import org.apache.spark.sql.hudi.command.{SqlKeyGenerator,
ValidateDuplicateKeyPayload}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
@@ -45,7 +48,6 @@ trait ProvidesHoodieConfig extends Logging {
def buildHoodieConfig(hoodieCatalogTable: HoodieCatalogTable): Map[String,
String] = {
val sparkSession: SparkSession = hoodieCatalogTable.spark
- val catalogProperties = hoodieCatalogTable.catalogProperties
val tableConfig = hoodieCatalogTable.tableConfig
// NOTE: Here we fallback to "" to make sure that null value is not
overridden with
@@ -56,17 +58,14 @@ trait ProvidesHoodieConfig extends Logging {
require(hoodieCatalogTable.primaryKeys.nonEmpty,
s"There are no primary key in table
${hoodieCatalogTable.table.identifier}, cannot execute update operator")
- val hoodieProps = getHoodieProps(catalogProperties, tableConfig,
sparkSession.sqlContext.conf)
+ val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable,
tableConfig)
- val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)
-
- withSparkConf(sparkSession, catalogProperties) {
+ withCombinedOptions(hoodieCatalogTable, tableConfig,
sparkSession.sqlContext.conf) {
Map.apply(
"path" -> hoodieCatalogTable.tableLocation,
RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
TBL_NAME.key -> hoodieCatalogTable.tableName,
PRECOMBINE_FIELD.key -> preCombineField,
- RECORD_MERGER_IMPLS.key ->
hoodieProps.getString(HoodieWriteConfig.RECORD_MERGER_IMPLS.key,
HoodieWriteConfig.RECORD_MERGER_IMPLS.defaultValue),
HIVE_STYLE_PARTITIONING.key ->
tableConfig.getHiveStylePartitioningEnable,
URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
KEYGENERATOR_CLASS_NAME.key ->
classOf[SqlKeyGenerator].getCanonicalName,
@@ -81,10 +80,8 @@ trait ProvidesHoodieConfig extends Logging {
HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key ->
tableConfig.getPartitionFieldProp,
HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key ->
hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS),
HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key ->
hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
- HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key ->
hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"),
SqlKeyGenerator.PARTITION_SCHEMA ->
hoodieCatalogTable.partitionSchema.toDDL
)
- .filter { case(_, v) => v != null }
}
}
@@ -109,12 +106,9 @@ trait ProvidesHoodieConfig extends Logging {
val path = hoodieCatalogTable.tableLocation
val tableType = hoodieCatalogTable.tableTypeName
val tableConfig = hoodieCatalogTable.tableConfig
- val catalogProperties = hoodieCatalogTable.catalogProperties
-
- val hoodieProps = getHoodieProps(catalogProperties, tableConfig,
sparkSession.sqlContext.conf, extraOptions)
- val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)
- val parameters = withSparkConf(sparkSession, catalogProperties)()
+ val combinedOpts = combineOptions(hoodieCatalogTable, tableConfig,
sparkSession.sqlContext.conf, extraOptions)
+ val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable,
tableConfig, extraOptions)
val partitionFieldsStr = hoodieCatalogTable.partitionFields.mkString(",")
@@ -128,18 +122,21 @@ trait ProvidesHoodieConfig extends Logging {
val keyGeneratorClassName = Option(tableConfig.getKeyGeneratorClassName)
.getOrElse(classOf[ComplexKeyGenerator].getCanonicalName)
- val enableBulkInsert =
parameters.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
- DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
||
-
parameters.get(DataSourceWriteOptions.OPERATION.key).exists(_.equalsIgnoreCase(WriteOperationType.BULK_INSERT.value))
+ val enableBulkInsert =
combinedOpts.getOrElse(DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key,
+ DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.defaultValue()).toBoolean
val dropDuplicate = sparkSession.conf
.getOption(INSERT_DROP_DUPS.key).getOrElse(INSERT_DROP_DUPS.defaultValue).toBoolean
- val insertMode =
InsertMode.of(parameters.getOrElse(DataSourceWriteOptions.SQL_INSERT_MODE.key,
+ val insertMode =
InsertMode.of(combinedOpts.getOrElse(DataSourceWriteOptions.SQL_INSERT_MODE.key,
DataSourceWriteOptions.SQL_INSERT_MODE.defaultValue()))
val isNonStrictMode = insertMode == InsertMode.NON_STRICT
val isPartitionedTable = hoodieCatalogTable.partitionFields.nonEmpty
val hasPrecombineColumn = hoodieCatalogTable.preCombineKey.nonEmpty
- val operation =
+
+ // NOTE: Target operation could be overridden by the user, therefore if it
has been provided as an input
+ // we'd prefer that value over auto-deduced operation. Otherwise, we
deduce target operation type
+ val operationOverride =
combinedOpts.get(DataSourceWriteOptions.OPERATION.key)
+ val operation = operationOverride.getOrElse {
(enableBulkInsert, isOverwritePartition, isOverwriteTable,
dropDuplicate, isNonStrictMode, isPartitionedTable) match {
case (true, _, _, _, false, _) =>
throw new IllegalArgumentException(s"Table with primaryKey can not
use bulk insert in ${insertMode.value()} mode.")
@@ -161,6 +158,7 @@ trait ProvidesHoodieConfig extends Logging {
// for the rest case, use the insert operation
case _ => INSERT_OPERATION_OPT_VAL
}
+ }
val payloadClassName = if (operation == UPSERT_OPERATION_OPT_VAL &&
tableType == COW_TABLE_TYPE_OPT_VAL && insertMode == InsertMode.STRICT) {
@@ -176,10 +174,7 @@ trait ProvidesHoodieConfig extends Logging {
classOf[OverwriteWithLatestAvroPayload].getCanonicalName
}
-
- logInfo(s"Insert statement use write operation type: $operation,
payloadClass: $payloadClassName")
-
- withSparkConf(sparkSession, catalogProperties) {
+ withCombinedOptions(hoodieCatalogTable, tableConfig,
sparkSession.sqlContext.conf) {
Map(
"path" -> path,
TABLE_TYPE.key -> tableType,
@@ -193,8 +188,6 @@ trait ProvidesHoodieConfig extends Logging {
PRECOMBINE_FIELD.key -> preCombineField,
PARTITIONPATH_FIELD.key -> partitionFieldsStr,
PAYLOAD_CLASS_NAME.key -> payloadClassName,
- RECORD_MERGER_IMPLS.key ->
hoodieProps.getString(HoodieWriteConfig.RECORD_MERGER_IMPLS.key,
HoodieWriteConfig.RECORD_MERGER_IMPLS.defaultValue),
- ENABLE_ROW_WRITER.key -> enableBulkInsert.toString,
HoodieWriteConfig.COMBINE_BEFORE_INSERT.key ->
String.valueOf(hasPrecombineColumn),
HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFieldsStr,
HoodieSyncConfig.META_SYNC_ENABLED.key ->
hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
@@ -204,26 +197,20 @@ trait ProvidesHoodieConfig extends Logging {
HoodieSyncConfig.META_SYNC_TABLE_NAME.key ->
hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key ->
hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key ->
hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS),
- HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key ->
hoodieProps.getString(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "200"),
- HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key ->
hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"),
SqlKeyGenerator.PARTITION_SCHEMA ->
hoodieCatalogTable.partitionSchema.toDDL
)
- .filter { case (_, v) => v != null }
}
}
- def buildHoodieDropPartitionsConfig(
- sparkSession: SparkSession,
- hoodieCatalogTable: HoodieCatalogTable,
- partitionsToDrop: String): Map[String,
String] = {
+ def buildHoodieDropPartitionsConfig(sparkSession: SparkSession,
+ hoodieCatalogTable: HoodieCatalogTable,
+ partitionsToDrop: String): Map[String,
String] = {
val partitionFields = hoodieCatalogTable.partitionFields.mkString(",")
- val catalogProperties = hoodieCatalogTable.catalogProperties
val tableConfig = hoodieCatalogTable.tableConfig
- val hoodieProps = getHoodieProps(catalogProperties, tableConfig,
sparkSession.sqlContext.conf)
- val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)
+ val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable,
tableConfig)
- withSparkConf(sparkSession, catalogProperties) {
+ withCombinedOptions(hoodieCatalogTable, tableConfig,
sparkSession.sqlContext.conf) {
Map(
"path" -> hoodieCatalogTable.tableLocation,
TBL_NAME.key -> hoodieCatalogTable.tableName,
@@ -242,14 +229,12 @@ trait ProvidesHoodieConfig extends Logging {
HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFields,
HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key ->
hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS)
)
- .filter { case (_, v) => v != null }
}
}
def buildHoodieDeleteTableConfig(hoodieCatalogTable: HoodieCatalogTable,
sparkSession: SparkSession): Map[String,
String] = {
val path = hoodieCatalogTable.tableLocation
- val catalogProperties = hoodieCatalogTable.catalogProperties
val tableConfig = hoodieCatalogTable.tableConfig
val tableSchema = hoodieCatalogTable.tableSchema
val partitionColumns =
tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase(Locale.ROOT))
@@ -258,14 +243,9 @@ trait ProvidesHoodieConfig extends Logging {
assert(hoodieCatalogTable.primaryKeys.nonEmpty,
s"There are no primary key defined in table
${hoodieCatalogTable.table.identifier}, cannot execute delete operation")
- val hoodieProps = getHoodieProps(catalogProperties, tableConfig,
sparkSession.sqlContext.conf)
- val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)
+ val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable,
tableConfig)
- val options = hoodieCatalogTable.catalogProperties
- val enableHive = isUsingHiveCatalog(sparkSession)
- val partitionFields = hoodieCatalogTable.partitionFields.mkString(",")
-
- withSparkConf(sparkSession, options) {
+ withCombinedOptions(hoodieCatalogTable, tableConfig,
sparkSession.sqlContext.conf) {
Map(
"path" -> path,
RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
@@ -282,24 +262,20 @@ trait ProvidesHoodieConfig extends Logging {
HoodieSyncConfig.META_SYNC_DATABASE_NAME.key ->
hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME),
HoodieSyncConfig.META_SYNC_TABLE_NAME.key ->
hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME),
HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key ->
hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
- HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFields,
+ HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key ->
hoodieCatalogTable.partitionFields.mkString(","),
HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key ->
hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS),
- HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key ->
hoodieProps.getString(HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key, "200"),
SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL
)
}
}
- def getHoodieProps(catalogProperties: Map[String, String], tableConfig:
HoodieTableConfig, conf: SQLConf, extraOptions: Map[String, String] =
Map.empty): TypedProperties = {
- val options: Map[String, String] = catalogProperties ++
tableConfig.getProps.asScala.toMap ++ conf.getAllConfs ++ extraOptions
- val hoodieConfig = HoodieWriterUtils.convertMapToHoodieConfig(options)
- hoodieConfig.getProps
- }
+ def buildHiveSyncConfig(sparkSession: SparkSession,
+ hoodieCatalogTable: HoodieCatalogTable,
+ tableConfig: HoodieTableConfig,
+ extraOptions: Map[String, String] = Map.empty):
HiveSyncConfig = {
+ val combinedOpts = combineOptions(hoodieCatalogTable, tableConfig,
sparkSession.sqlContext.conf, extraOptions)
+ val props = new TypedProperties(toProperties(combinedOpts))
- def buildHiveSyncConfig(
- props: TypedProperties,
- hoodieCatalogTable: HoodieCatalogTable,
- sparkSession: SparkSession = SparkSession.active): HiveSyncConfig = {
// Enable the hive sync by default if spark have enable the hive metastore.
val enableHive = isUsingHiveCatalog(sparkSession)
val hiveSyncConfig: HiveSyncConfig = new HiveSyncConfig(props)
@@ -325,3 +301,39 @@ trait ProvidesHoodieConfig extends Logging {
hiveSyncConfig
}
}
+
+object ProvidesHoodieConfig {
+
+ def filterNullValues(opts: Map[String, String]): Map[String, String] =
+ opts.filter { case (_, v) => v != null }
+
+ def withCombinedOptions(catalogTable: HoodieCatalogTable,
+ tableConfig: HoodieTableConfig,
+ sqlConf: SQLConf)(optionOverrides: Map[String,
String] = Map.empty): Map[String, String] = {
+ combineOptions(catalogTable, tableConfig, sqlConf, optionOverrides)
+ }
+
+ private def combineOptions(catalogTable: HoodieCatalogTable,
+ tableConfig: HoodieTableConfig,
+ sqlConf: SQLConf,
+ optionOverrides: Map[String, String] =
Map.empty): Map[String, String] = {
+ // NOTE: Properties are merged in the following order of priority (first
has the highest priority, last has the
+ // lowest, which is inverse to the ordering in the code):
+ // 1. (Extra) Option overrides
+ // 2. Spark SQL configs
+ // 3. Persisted Hudi's Table configs
+ // 4. Table's properties in Spark Catalog
+ // 5. Global DFS properties
+ DFSPropertiesConfiguration.getGlobalProps.asScala.toMap ++
+ // NOTE: Catalog table provided t/h `TBLPROPERTIES` clause might contain
Spark SQL specific
+ // properties that need to be mapped into Hudi's conventional ones
+ mapSqlOptionsToDataSourceWriteConfigs(catalogTable.catalogProperties) ++
+ tableConfig.getProps.asScala.toMap ++
+ filterHoodieConfigs(sqlConf.getAllConfs) ++
+ filterNullValues(optionOverrides)
+ }
+
+ private def filterHoodieConfigs(opts: Map[String, String]): Map[String,
String] =
+ opts.filterKeys(isHoodieConfigKey)
+
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
index 1f8d0095301..0e19514e28c 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableAsSelectCommand.scala
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.common.util.ValidationUtils.checkState
+import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.HiveSyncConfigHolder
import org.apache.hudi.sql.InsertMode
import org.apache.hudi.sync.common.util.ConfigUtils
@@ -92,13 +93,11 @@ case class CreateHoodieTableAsSelectCommand(
hoodieCatalogTable.initHoodieTable()
val tableProperties = hoodieCatalogTable.catalogProperties
- // NOTE: Users might be specifying write-configuration (inadvertently)
as options or table properties
- // in CTAS, therefore we need to make sure that these are
appropriately propagated to the
- // write operation
- val options = tableProperties ++ Map(
+ val options = Map(
HiveSyncConfigHolder.HIVE_CREATE_MANAGED_TABLE.key -> (table.tableType
== CatalogTableType.MANAGED).toString,
HiveSyncConfigHolder.HIVE_TABLE_SERDE_PROPERTIES.key ->
ConfigUtils.configToString(tableProperties.asJava),
HiveSyncConfigHolder.HIVE_TABLE_PROPERTIES.key ->
ConfigUtils.configToString(updatedTable.properties.asJava),
+ HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> "false",
DataSourceWriteOptions.SQL_INSERT_MODE.key ->
InsertMode.NON_STRICT.value(),
DataSourceWriteOptions.SQL_ENABLE_BULK_INSERT.key -> "true"
)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
index 7befb97c7b8..9099c7225e0 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala
@@ -20,11 +20,10 @@ package org.apache.spark.sql.hudi.command
import org.apache.avro.Schema
import org.apache.hudi.AvroConversionUtils.convertStructTypeToAvroSchema
import org.apache.hudi.DataSourceWriteOptions._
-import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.common.model.HoodieAvroRecordMerger
+import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.config.HoodieWriteConfig.{AVRO_SCHEMA_VALIDATE_ENABLE,
TBL_NAME}
-import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.hive.HiveSyncConfigHolder
import org.apache.hudi.sync.common.HoodieSyncConfig
@@ -38,6 +37,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias,
Attribute, AttributeRef
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._
import org.apache.spark.sql.hudi.HoodieSqlUtils.getMergeIntoTargetTableId
+import org.apache.spark.sql.hudi.ProvidesHoodieConfig.withCombinedOptions
import
org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.CoercedAttributeReference
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload
import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._
@@ -491,8 +491,6 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
val targetTableDb = targetTableIdentify.database.getOrElse("default")
val targetTableName = targetTableIdentify.identifier
val path = hoodieCatalogTable.tableLocation
- // force to use ExpressionPayload as WRITE_PAYLOAD_CLASS_NAME in
MergeIntoHoodieTableCommand
- val catalogProperties = hoodieCatalogTable.catalogProperties +
(PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName)
val tableConfig = hoodieCatalogTable.tableConfig
val tableSchema = hoodieCatalogTable.tableSchema
val partitionColumns =
tableConfig.getPartitionFieldProp.split(",").map(_.toLowerCase)
@@ -503,10 +501,9 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
// TODO(HUDI-3456) clean up
val preCombineField = hoodieCatalogTable.preCombineKey.getOrElse("")
- val hoodieProps = getHoodieProps(catalogProperties, tableConfig,
sparkSession.sqlContext.conf)
- val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)
+ val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable,
tableConfig)
- withSparkConf(sparkSession, catalogProperties) {
+ withCombinedOptions(hoodieCatalogTable, tableConfig,
sparkSession.sqlContext.conf) {
Map(
"path" -> path,
RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp,
@@ -525,10 +522,8 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key ->
hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString,
HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key ->
tableConfig.getPartitionFieldProp,
HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key ->
hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS),
- HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key ->
hoodieProps.getString(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key, "200"),
// set the default parallelism to 200 for sql
- HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key ->
hoodieProps.getString(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key, "200"),
- HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key ->
hoodieProps.getString(HoodieWriteConfig.DELETE_PARALLELISM_VALUE.key, "200"),
SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL,
+ PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName,
// NOTE: We have to explicitly override following configs to make sure
no schema validation is performed
// as schema of the incoming dataset might be diverging from the
table's schema (full schemas'
@@ -539,7 +534,6 @@ case class MergeIntoHoodieTableCommand(mergeInto:
MergeIntoTable) extends Hoodie
RECONCILE_SCHEMA.key -> "false",
"hoodie.datasource.write.schema.canonicalize" -> "false"
)
- .filter { case (_, v) => v != null }
}
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HiveSyncProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HiveSyncProcedure.scala
index f237a1dcbb4..513f40a4c8c 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HiveSyncProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HiveSyncProcedure.scala
@@ -84,8 +84,7 @@ class HiveSyncProcedure extends BaseProcedure with
ProcedureBuilder
hiveConf.addResource(hadoopConf)
val tableConfig = hoodieCatalogTable.tableConfig
- val hoodieProps = getHoodieProps(hoodieCatalogTable.catalogProperties,
tableConfig, sqlConf)
- val hiveSyncConfig = buildHiveSyncConfig(hoodieProps, hoodieCatalogTable)
+ val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable,
tableConfig)
var hiveSyncTool: HiveSyncTool = null
try {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
index cf37f7436fd..6beed5cfc39 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala
@@ -23,11 +23,14 @@ import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.config.HoodieStorageConfig
import org.apache.hudi.common.model.HoodieAvroRecordMerger
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
+import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.exception.ExceptionUtil.getRootCause
import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.checkMessageContains
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.util.Utils
import org.joda.time.DateTimeZone
@@ -144,8 +147,11 @@ class HoodieSparkSqlTestBase extends FunSuite with
BeforeAndAfterAll {
try {
spark.sql(sql)
} catch {
- case e: Throwable if e.getMessage.trim.contains(errorMsg.trim) =>
hasException = true
- case f: Throwable => fail("Exception should contain: " + errorMsg + ",
error message: " + f.getMessage, f)
+ case e: Throwable if checkMessageContains(e, errorMsg) ||
checkMessageContains(getRootCause(e), errorMsg) =>
+ hasException = true
+
+ case f: Throwable =>
+ fail("Exception should contain: " + errorMsg + ", error message: " +
f.getMessage, f)
}
assertResult(true)(hasException)
}
@@ -219,3 +225,19 @@ class HoodieSparkSqlTestBase extends FunSuite with
BeforeAndAfterAll {
}
}
}
+
+object HoodieSparkSqlTestBase {
+
+ def getLastCommitMetadata(spark: SparkSession, tablePath: String) = {
+ val metaClient = HoodieTableMetaClient.builder()
+ .setConf(spark.sparkContext.hadoopConfiguration)
+ .setBasePath(tablePath)
+ .build()
+
+
metaClient.getActiveTimeline.getLastCommitMetadataWithValidData.get.getRight
+ }
+
+ private def checkMessageContains(e: Throwable, text: String): Boolean =
+ e.getMessage.trim.contains(text.trim)
+
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
index 9fc910c4f1f..120c6adb6cd 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCreateTable.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieSparkUtils
-import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.model.{HoodieRecord, WriteOperationType}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.PartitionPathEncodeUtils.escapePathName
import org.apache.hudi.config.HoodieWriteConfig
@@ -28,6 +28,7 @@ import org.apache.hudi.keygen.{ComplexKeyGenerator,
NonpartitionedKeyGenerator,
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.getLastCommitMetadata
import org.apache.spark.sql.types._
import org.junit.jupiter.api.Assertions.assertFalse
@@ -299,6 +300,10 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
| AS
| select 1 as id, 'a1' as name, 10 as price, 1000 as ts
""".stripMargin)
+
+ assertResult(WriteOperationType.BULK_INSERT) {
+ getLastCommitMetadata(spark,
s"${tmp.getCanonicalPath}/$tableName1").getOperationType
+ }
checkAnswer(s"select id, name, price, ts from $tableName1")(
Seq(1, "a1", 10.0, 1000)
)
@@ -318,6 +323,10 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
| select 1 as id, 'a1' as name, 10 as price, '2021-04-01' as dt
""".stripMargin
)
+
+ assertResult(WriteOperationType.BULK_INSERT) {
+ getLastCommitMetadata(spark,
s"${tmp.getCanonicalPath}/$tableName2").getOperationType
+ }
checkAnswer(s"select id, name, price, dt from $tableName2")(
Seq(1, "a1", 10, "2021-04-01")
)
@@ -356,9 +365,14 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
| price
""".stripMargin
)
+
+ assertResult(WriteOperationType.BULK_INSERT) {
+ getLastCommitMetadata(spark,
s"${tmp.getCanonicalPath}/$tableName3").getOperationType
+ }
checkAnswer(s"select id, name, price, cast(dt as string) from
$tableName3")(
Seq(1, "a1", 10, "2021-05-06 00:00:00")
)
+
// Create table with date type partition
val tableName4 = generateTableName
spark.sql(
@@ -375,6 +389,10 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
| price
""".stripMargin
)
+
+ assertResult(WriteOperationType.BULK_INSERT) {
+ getLastCommitMetadata(spark,
s"${tmp.getCanonicalPath}/$tableName4").getOperationType
+ }
checkAnswer(s"select id, name, price, cast(dt as string) from
$tableName4")(
Seq(1, "a1", 10, "2021-05-06")
)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index 868a96ebc2c..a227a20b8b6 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -22,11 +22,13 @@ import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.common.table.{HoodieTableMetaClient,
TableSchemaResolver}
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
+import org.apache.hudi.common.model.WriteOperationType
import
org.apache.spark.sql.hudi.command.HoodieSparkValidateDuplicateKeyRecordMerger
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieDuplicateKeyException
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.spark.sql.SaveMode
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.getLastCommitMetadata
import java.io.File
@@ -725,13 +727,20 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
spark.sql("set hoodie.sql.bulk.insert.enable = true")
spark.sql(s"insert into $tableName values(1, 'a1', 10,
'2021-07-18')")
+ assertResult(WriteOperationType.BULK_INSERT) {
+ getLastCommitMetadata(spark,
s"${tmp.getCanonicalPath}/$tableName").getOperationType
+ }
checkAnswer(s"select id, name, price, dt from $tableName")(
Seq(1, "a1", 10.0, "2021-07-18")
)
+
// Disable the bulk insert
spark.sql("set hoodie.sql.bulk.insert.enable = false")
spark.sql(s"insert into $tableName values(2, 'a2', 10,
'2021-07-18')")
+ assertResult(WriteOperationType.INSERT) {
+ getLastCommitMetadata(spark,
s"${tmp.getCanonicalPath}/$tableName").getOperationType
+ }
checkAnswer(s"select id, name, price, dt from $tableName order by
id")(
Seq(1, "a1", 10.0, "2021-07-18"),
Seq(2, "a2", 10.0, "2021-07-18")