This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.13.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 91a9c821d2f04bd2e3233930eb8db9467dc12b26 Author: Alexey Kudinkin <[email protected]> AuthorDate: Thu Feb 2 04:20:17 2023 -0800 [HUDI-5684] Fix CTAS and Insert Into to avoid combine-on-insert by default (#7813) * Remove `COMBINE_BEFORE_INSERT` config being overridden for insert operations * Revisited Spark SQL feature configuration to allow dichotomy of having: - (Feature-)specific "default" configuration (that could be overridden by the user) - "Overriding" configuration (that could NOT be overridden by the user) * Restoring existing behavior for Insert Into to deduplicate by default (if pre-combine is specified) * Fixing compilation * Fixing compilation (one more time) * Fixing options combination ordering --- .../spark/sql/hudi/ProvidesHoodieConfig.scala | 236 +++++++++++---------- .../hudi/command/MergeIntoHoodieTableCommand.scala | 67 +++--- 2 files changed, 163 insertions(+), 140 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala index 77ff939cf26..c8f01a12623 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hudi.HoodieOptionConfig.mapSqlOptionsToDataSourceWriteConfigs import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isHoodieConfigKey, isUsingHiveCatalog} -import org.apache.spark.sql.hudi.ProvidesHoodieConfig.{combineOptions, withCombinedOptions} +import org.apache.spark.sql.hudi.ProvidesHoodieConfig.combineOptions import org.apache.spark.sql.hudi.command.{SqlKeyGenerator, ValidateDuplicateKeyPayload} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -60,29 +60,33 @@ trait ProvidesHoodieConfig extends Logging { val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig) - withCombinedOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf) { - Map.apply( - "path" -> hoodieCatalogTable.tableLocation, - RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), - TBL_NAME.key -> hoodieCatalogTable.tableName, - PRECOMBINE_FIELD.key -> preCombineField, - HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, - URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning, - KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, - SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName, - OPERATION.key -> UPSERT_OPERATION_OPT_VAL, - PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp, - HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key), - HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key), - HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()), - HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME), - HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME), - HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp, - HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS), - HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString, - SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL - ) - } + val defaultOpts = Map[String, String]( + KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, + OPERATION.key -> UPSERT_OPERATION_OPT_VAL + ) + + val overridingOpts = Map[String, String]( + "path" -> hoodieCatalogTable.tableLocation, + RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), + TBL_NAME.key -> hoodieCatalogTable.tableName, + PRECOMBINE_FIELD.key -> preCombineField, + HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, + URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning, + SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName, + SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL, + PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp, + HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key), + HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key), + HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()), + HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME), + HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME), + HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp, + HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS), + HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString + ) + + combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf, + defaultOpts = defaultOpts, overridingOpts = overridingOpts) } /** @@ -107,7 +111,8 @@ trait ProvidesHoodieConfig extends Logging { val tableType = hoodieCatalogTable.tableTypeName val tableConfig = hoodieCatalogTable.tableConfig - val combinedOpts = combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf, extraOptions) + val combinedOpts: Map[String, String] = combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf, + defaultOpts = Map.empty, overridingOpts = extraOptions) val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig, extraOptions) val partitionFieldsStr = hoodieCatalogTable.partitionFields.mkString(",") @@ -174,32 +179,38 @@ trait ProvidesHoodieConfig extends Logging { classOf[OverwriteWithLatestAvroPayload].getCanonicalName } - withCombinedOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf) { - Map( - "path" -> path, - TABLE_TYPE.key -> tableType, - TBL_NAME.key -> hoodieCatalogTable.tableName, - OPERATION.key -> operation, - HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable, - URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning, - KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, - SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> keyGeneratorClassName, - RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), - PRECOMBINE_FIELD.key -> preCombineField, - PARTITIONPATH_FIELD.key -> partitionFieldsStr, - PAYLOAD_CLASS_NAME.key -> payloadClassName, - HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn), - HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFieldsStr, - HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key), - HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key), - HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()), - HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME), - HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME), - HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString, - HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS), - SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL - ) - } + val defaultOpts = Map( + PAYLOAD_CLASS_NAME.key -> payloadClassName, + // NOTE: By default insert would try to do deduplication in case that pre-combine column is specified + // for the table + HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn) + ) + + val overridingOpts = Map( + "path" -> path, + TABLE_TYPE.key -> tableType, + TBL_NAME.key -> hoodieCatalogTable.tableName, + OPERATION.key -> operation, + HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable, + URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning, + KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, + SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> keyGeneratorClassName, + SqlKeyGenerator.PARTITION_SCHEMA -> hoodieCatalogTable.partitionSchema.toDDL, + RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), + PRECOMBINE_FIELD.key -> preCombineField, + PARTITIONPATH_FIELD.key -> partitionFieldsStr, + HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFieldsStr, + HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key), + HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key), + HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()), + HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME), + HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME), + HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString, + HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS) + ) + + combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf, + defaultOpts = defaultOpts, overridingOpts = overridingOpts) } def buildHoodieDropPartitionsConfig(sparkSession: SparkSession, @@ -210,26 +221,27 @@ trait ProvidesHoodieConfig extends Logging { val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig) - withCombinedOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf) { - Map( - "path" -> hoodieCatalogTable.tableLocation, - TBL_NAME.key -> hoodieCatalogTable.tableName, - TABLE_TYPE.key -> hoodieCatalogTable.tableTypeName, - OPERATION.key -> DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL, - PARTITIONS_TO_DELETE.key -> partitionsToDrop, - RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), - PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""), - PARTITIONPATH_FIELD.key -> partitionFields, - HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key), - HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key), - HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()), - HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME), - HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME), - HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString, - HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFields, - HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS) - ) - } + val overridingOpts = Map( + "path" -> hoodieCatalogTable.tableLocation, + TBL_NAME.key -> hoodieCatalogTable.tableName, + TABLE_TYPE.key -> hoodieCatalogTable.tableTypeName, + OPERATION.key -> DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL, + PARTITIONS_TO_DELETE.key -> partitionsToDrop, + RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), + PRECOMBINE_FIELD.key -> hoodieCatalogTable.preCombineKey.getOrElse(""), + PARTITIONPATH_FIELD.key -> partitionFields, + HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key), + HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key), + HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()), + HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME), + HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME), + HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString, + HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFields, + HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS) + ) + + combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf, + defaultOpts = Map.empty, overridingOpts = overridingOpts) } def buildHoodieDeleteTableConfig(hoodieCatalogTable: HoodieCatalogTable, @@ -245,35 +257,37 @@ trait ProvidesHoodieConfig extends Logging { val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig) - withCombinedOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf) { - Map( - "path" -> path, - RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), - TBL_NAME.key -> tableConfig.getTableName, - HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, - URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning, - KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, - SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName, - OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL, - PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp, - HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key), - HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key), - HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()), - HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME), - HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME), - HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString, - HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> hoodieCatalogTable.partitionFields.mkString(","), - HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS), - SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL - ) - } + val overridingOpts = Map( + "path" -> path, + RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","), + TBL_NAME.key -> tableConfig.getTableName, + HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, + URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning, + KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, + SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName, + SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL, + OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL, + PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp, + HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key), + HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key), + HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE, HiveSyncMode.HMS.name()), + HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_DATABASE_NAME), + HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_TABLE_NAME), + HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString, + HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> hoodieCatalogTable.partitionFields.mkString(","), + HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS) + ) + + combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf, + defaultOpts = Map.empty, overridingOpts = overridingOpts) } def buildHiveSyncConfig(sparkSession: SparkSession, hoodieCatalogTable: HoodieCatalogTable, tableConfig: HoodieTableConfig, extraOptions: Map[String, String] = Map.empty): HiveSyncConfig = { - val combinedOpts = combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf, extraOptions) + val combinedOpts = combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf, + defaultOpts = Map.empty, overridingOpts = extraOptions) val props = new TypedProperties(toProperties(combinedOpts)) // Enable the hive sync by default if spark have enable the hive metastore. @@ -304,19 +318,22 @@ trait ProvidesHoodieConfig extends Logging { object ProvidesHoodieConfig { - def filterNullValues(opts: Map[String, String]): Map[String, String] = - opts.filter { case (_, v) => v != null } - - def withCombinedOptions(catalogTable: HoodieCatalogTable, - tableConfig: HoodieTableConfig, - sqlConf: SQLConf)(optionOverrides: Map[String, String] = Map.empty): Map[String, String] = { - combineOptions(catalogTable, tableConfig, sqlConf, optionOverrides) - } - - private def combineOptions(catalogTable: HoodieCatalogTable, + // NOTE: PLEASE READ CAREFULLY BEFORE CHANGING + // + // Spark SQL operations configuration might be coming from a variety of diverse sources + // that have to be ultimately combined under clear and consistent process: + // + // - Default: specify default values preferred by the feature/component (could be + // overridden by any source) + // + // - Overriding: specify mandatory values required for the feature/component (could NOT be + // overridden by any source)s + // + def combineOptions(catalogTable: HoodieCatalogTable, tableConfig: HoodieTableConfig, sqlConf: SQLConf, - optionOverrides: Map[String, String] = Map.empty): Map[String, String] = { + defaultOpts: Map[String, String], + overridingOpts: Map[String, String] = Map.empty): Map[String, String] = { // NOTE: Properties are merged in the following order of priority (first has the highest priority, last has the // lowest, which is inverse to the ordering in the code): // 1. (Extra) Option overrides @@ -324,15 +341,20 @@ object ProvidesHoodieConfig { // 3. Persisted Hudi's Table configs // 4. Table's properties in Spark Catalog // 5. Global DFS properties - DFSPropertiesConfiguration.getGlobalProps.asScala.toMap ++ + // 6. (Feature-specific) Default values + filterNullValues(defaultOpts) ++ + DFSPropertiesConfiguration.getGlobalProps.asScala.toMap ++ // NOTE: Catalog table provided t/h `TBLPROPERTIES` clause might contain Spark SQL specific // properties that need to be mapped into Hudi's conventional ones mapSqlOptionsToDataSourceWriteConfigs(catalogTable.catalogProperties) ++ tableConfig.getProps.asScala.toMap ++ filterHoodieConfigs(sqlConf.getAllConfs) ++ - filterNullValues(optionOverrides) + filterNullValues(overridingOpts) } + private def filterNullValues(opts: Map[String, String]): Map[String, String] = + opts.filter { case (_, v) => v != null } + private def filterHoodieConfigs(opts: Map[String, String]): Map[String, String] = opts.filterKeys(isHoodieConfigKey) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala index 93972b392b2..ed3f2591253 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeRef import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._ import org.apache.spark.sql.hudi.HoodieSqlUtils.getMergeIntoTargetTableId -import org.apache.spark.sql.hudi.ProvidesHoodieConfig.withCombinedOptions +import org.apache.spark.sql.hudi.ProvidesHoodieConfig.combineOptions import org.apache.spark.sql.hudi.command.MergeIntoHoodieTableCommand.CoercedAttributeReference import org.apache.spark.sql.hudi.command.payload.ExpressionPayload import org.apache.spark.sql.hudi.command.payload.ExpressionPayload._ @@ -504,38 +504,39 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable, tableConfig) - withCombinedOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf) { - Map( - "path" -> path, - RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp, - PRECOMBINE_FIELD.key -> preCombineField, - TBL_NAME.key -> hoodieCatalogTable.tableName, - PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp, - HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, - URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning, - KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, - SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName, - HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key), - HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key), - HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_MODE), - HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> targetTableDb, - HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> targetTableName, - HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString, - HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp, - HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS), - SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL, - PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName, - - // NOTE: We have to explicitly override following configs to make sure no schema validation is performed - // as schema of the incoming dataset might be diverging from the table's schema (full schemas' - // compatibility b/w table's schema and incoming one is not necessary in this case since we can - // be cherry-picking only selected columns from the incoming dataset to be inserted/updated in the - // target table, ie partially updating) - AVRO_SCHEMA_VALIDATE_ENABLE.key -> "false", - RECONCILE_SCHEMA.key -> "false", - CANONICALIZE_NULLABLE.key -> "false" - ) - } + val overridingOpts = Map( + "path" -> path, + RECORDKEY_FIELD.key -> tableConfig.getRecordKeyFieldProp, + PRECOMBINE_FIELD.key -> preCombineField, + TBL_NAME.key -> hoodieCatalogTable.tableName, + PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp, + HIVE_STYLE_PARTITIONING.key -> tableConfig.getHiveStylePartitioningEnable, + URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning, + KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName, + SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> tableConfig.getKeyGeneratorClassName, + HoodieSyncConfig.META_SYNC_ENABLED.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key), + HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key), + HiveSyncConfigHolder.HIVE_SYNC_MODE.key -> hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_MODE), + HoodieSyncConfig.META_SYNC_DATABASE_NAME.key -> targetTableDb, + HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> targetTableName, + HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key -> hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString, + HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> tableConfig.getPartitionFieldProp, + HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS), + SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL, + PAYLOAD_CLASS_NAME.key -> classOf[ExpressionPayload].getCanonicalName, + + // NOTE: We have to explicitly override following configs to make sure no schema validation is performed + // as schema of the incoming dataset might be diverging from the table's schema (full schemas' + // compatibility b/w table's schema and incoming one is not necessary in this case since we can + // be cherry-picking only selected columns from the incoming dataset to be inserted/updated in the + // target table, ie partially updating) + AVRO_SCHEMA_VALIDATE_ENABLE.key -> "false", + RECONCILE_SCHEMA.key -> "false", + CANONICALIZE_NULLABLE.key -> "false" + ) + + combineOptions(hoodieCatalogTable, tableConfig, sparkSession.sqlContext.conf, + defaultOpts = Map.empty, overridingOpts = overridingOpts) } }
