This is an automated email from the ASF dual-hosted git repository.
akudinkin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 13ed95a9fd2 Make most of the Spark SQL DML operations configs
overridable (#7850)
13ed95a9fd2 is described below
commit 13ed95a9fd2e0b330f8e16cb2b08714a432a70b4
Author: Alexey Kudinkin <[email protected]>
AuthorDate: Sat Feb 4 10:54:17 2023 -0800
Make most of the Spark SQL DML operations configs overridable (#7850)
This PR makes most of the Spark SQL operations configs are overridable,
leaving only a few ones as those that should not and couldn't be overridden by
the user
---
.../spark/sql/hudi/ProvidesHoodieConfig.scala | 69 +++++++++++-----------
1 file changed, 36 insertions(+), 33 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index 0c766f5135b..ffc4079824f 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -61,20 +61,10 @@ trait ProvidesHoodieConfig extends Logging {
val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable,
tableConfig)
val defaultOpts = Map[String, String](
+ OPERATION.key -> UPSERT_OPERATION_OPT_VAL,
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
- OPERATION.key -> UPSERT_OPERATION_OPT_VAL
- )
-
- val overridingOpts = Map[String, String](
- "path" -> hoodieCatalogTable.tableLocation,
- RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
- TBL_NAME.key -> hoodieCatalogTable.tableName,
- PRECOMBINE_FIELD.key -> preCombineField,
- HIVE_STYLE_PARTITIONING.key ->
tableConfig.getHiveStylePartitioningEnable,
- URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME ->
tableConfig.getKeyGeneratorClassName,
SqlKeyGenerator.PARTITION_SCHEMA ->
hoodieCatalogTable.partitionSchema.toDDL,
- PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
HoodieSyncConfig.META_SYNC_ENABLED.key ->
hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key ->
hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
HiveSyncConfigHolder.HIVE_SYNC_MODE.key ->
hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE,
HiveSyncMode.HMS.name()),
@@ -85,6 +75,16 @@ trait ProvidesHoodieConfig extends Logging {
HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE.key ->
hiveSyncConfig.getBoolean(HiveSyncConfigHolder.HIVE_SUPPORT_TIMESTAMP_TYPE).toString
)
+ val overridingOpts = Map[String, String](
+ "path" -> hoodieCatalogTable.tableLocation,
+ RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
+ TBL_NAME.key -> hoodieCatalogTable.tableName,
+ PRECOMBINE_FIELD.key -> preCombineField,
+ HIVE_STYLE_PARTITIONING.key ->
tableConfig.getHiveStylePartitioningEnable,
+ URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
+ PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp
+ )
+
combineOptions(hoodieCatalogTable, tableConfig,
sparkSession.sqlContext.conf,
defaultOpts = defaultOpts, overridingOpts = overridingOpts)
}
@@ -183,22 +183,10 @@ trait ProvidesHoodieConfig extends Logging {
PAYLOAD_CLASS_NAME.key -> payloadClassName,
// NOTE: By default insert would try to do deduplication in case that
pre-combine column is specified
// for the table
- HoodieWriteConfig.COMBINE_BEFORE_INSERT.key ->
String.valueOf(hasPrecombineColumn)
- )
-
- val overridingOpts = extraOptions ++ Map(
- "path" -> path,
- TABLE_TYPE.key -> tableType,
- TBL_NAME.key -> hoodieCatalogTable.tableName,
- OPERATION.key -> operation,
- HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable,
- URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning,
+ HoodieWriteConfig.COMBINE_BEFORE_INSERT.key ->
String.valueOf(hasPrecombineColumn),
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME -> keyGeneratorClassName,
SqlKeyGenerator.PARTITION_SCHEMA ->
hoodieCatalogTable.partitionSchema.toDDL,
- RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
- PRECOMBINE_FIELD.key -> preCombineField,
- PARTITIONPATH_FIELD.key -> partitionFieldsStr,
HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> partitionFieldsStr,
HoodieSyncConfig.META_SYNC_ENABLED.key ->
hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key ->
hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
@@ -209,6 +197,18 @@ trait ProvidesHoodieConfig extends Logging {
HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key ->
hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS)
)
+ val overridingOpts = extraOptions ++ Map(
+ "path" -> path,
+ TABLE_TYPE.key -> tableType,
+ TBL_NAME.key -> hoodieCatalogTable.tableName,
+ OPERATION.key -> operation,
+ HIVE_STYLE_PARTITIONING.key -> hiveStylePartitioningEnable,
+ URL_ENCODE_PARTITIONING.key -> urlEncodePartitioning,
+ RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
+ PRECOMBINE_FIELD.key -> preCombineField,
+ PARTITIONPATH_FIELD.key -> partitionFieldsStr
+ )
+
combineOptions(hoodieCatalogTable, tableConfig,
sparkSession.sqlContext.conf,
defaultOpts = defaultOpts, overridingOpts = overridingOpts)
}
@@ -257,17 +257,10 @@ trait ProvidesHoodieConfig extends Logging {
val hiveSyncConfig = buildHiveSyncConfig(sparkSession, hoodieCatalogTable,
tableConfig)
- val overridingOpts = Map(
- "path" -> path,
- RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
- TBL_NAME.key -> tableConfig.getTableName,
- HIVE_STYLE_PARTITIONING.key ->
tableConfig.getHiveStylePartitioningEnable,
- URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
+ val defaultOpts = Map(
KEYGENERATOR_CLASS_NAME.key -> classOf[SqlKeyGenerator].getCanonicalName,
SqlKeyGenerator.ORIGINAL_KEYGEN_CLASS_NAME ->
tableConfig.getKeyGeneratorClassName,
SqlKeyGenerator.PARTITION_SCHEMA -> partitionSchema.toDDL,
- OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
- PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp,
HoodieSyncConfig.META_SYNC_ENABLED.key ->
hiveSyncConfig.getString(HoodieSyncConfig.META_SYNC_ENABLED.key),
HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key ->
hiveSyncConfig.getString(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key),
HiveSyncConfigHolder.HIVE_SYNC_MODE.key ->
hiveSyncConfig.getStringOrDefault(HiveSyncConfigHolder.HIVE_SYNC_MODE,
HiveSyncMode.HMS.name()),
@@ -278,8 +271,18 @@ trait ProvidesHoodieConfig extends Logging {
HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key ->
hiveSyncConfig.getStringOrDefault(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS)
)
+ val overridingOpts = Map(
+ "path" -> path,
+ RECORDKEY_FIELD.key -> hoodieCatalogTable.primaryKeys.mkString(","),
+ TBL_NAME.key -> tableConfig.getTableName,
+ HIVE_STYLE_PARTITIONING.key ->
tableConfig.getHiveStylePartitioningEnable,
+ URL_ENCODE_PARTITIONING.key -> tableConfig.getUrlEncodePartitioning,
+ OPERATION.key -> DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL,
+ PARTITIONPATH_FIELD.key -> tableConfig.getPartitionFieldProp
+ )
+
combineOptions(hoodieCatalogTable, tableConfig,
sparkSession.sqlContext.conf,
- defaultOpts = Map.empty, overridingOpts = overridingOpts)
+ defaultOpts = defaultOpts, overridingOpts = overridingOpts)
}
def buildHiveSyncConfig(sparkSession: SparkSession,