yihua commented on code in PR #11943:
URL: https://github.com/apache/hudi/pull/11943#discussion_r1809692780
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala:
##########
@@ -112,7 +112,7 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
metadataConfig,
// TODO support CDC with spark record
recordMergerImpls = List(classOf[HoodieAvroRecordMerger].getName),
- recordMergerStrategy = HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID
+ recordMergerStrategy =
HoodieRecordMerger.PAYLOAD_BASED_MERGER_STRATEGY_UUID
Review Comment:
Looks like there are places that the config passing can be improved later.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##########
@@ -284,8 +286,23 @@ trait ProvidesHoodieConfig extends Logging {
}
}
+ val (recordMergeMode, recordMergeStrategy) = if
(payloadClassName.equals(classOf[ValidateDuplicateKeyPayload].getCanonicalName))
{
Review Comment:
To be differentiated with the user-set payload class:
```suggestion
val (recordMergeMode, recordMergeStrategy) = if
(deducedPayloadClassName.equals(classOf[ValidateDuplicateKeyPayload].getCanonicalName))
{
```
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala:
##########
@@ -284,8 +286,23 @@ trait ProvidesHoodieConfig extends Logging {
}
}
+ val (recordMergeMode, recordMergeStrategy) = if
(payloadClassName.equals(classOf[ValidateDuplicateKeyPayload].getCanonicalName))
{
+ (RecordMergeMode.CUSTOM.name(),
HoodieRecordMerger.PAYLOAD_BASED_MERGER_STRATEGY_UUID)
+ } else {
+ (RecordMergeMode.EVENT_TIME_ORDERING.name(),
HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
+ }
+
+ if
(tableConfig.getPayloadClass.equals(classOf[DefaultHoodieRecordPayload].getCanonicalName)
&&
+
tableConfig.getRecordMergeMode.equals(RecordMergeMode.EVENT_TIME_ORDERING)) {
+ tableConfig.clearValue(HoodieTableConfig.PAYLOAD_CLASS_NAME)
Review Comment:
Is this modified table config only used here? Does it have any side effect?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieOptionConfig.scala:
##########
@@ -67,21 +68,18 @@ object HoodieOptionConfig {
.withSqlKey("payloadClass")
.withHoodieKey(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key)
.withTableConfigKey(HoodieTableConfig.PAYLOAD_CLASS_NAME.key)
- .defaultValue(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.defaultValue())
.build()
- val SQL_PAYLOAD_TYPE: HoodieSQLOption[String] = buildConf()
- .withSqlKey("payloadType")
- .withHoodieKey(DataSourceWriteOptions.PAYLOAD_TYPE.key)
- .withTableConfigKey(HoodieTableConfig.PAYLOAD_TYPE.key)
- .defaultValue(DataSourceWriteOptions.PAYLOAD_TYPE.defaultValue())
+ val SQL_MERGE_MODE: HoodieSQLOption[String] = buildConf()
+ .withSqlKey("mergeMode")
+ .withHoodieKey(HoodieWriteConfig.RECORD_MERGE_MODE.key)
+ .withTableConfigKey(HoodieTableConfig.RECORD_MERGE_MODE.key)
.build()
val SQL_RECORD_MERGER_STRATEGY: HoodieSQLOption[String] = buildConf()
.withSqlKey("recordMergerStrategy")
- .withHoodieKey(DataSourceWriteOptions.RECORD_MERGER_STRATEGY.key)
+ .withHoodieKey(DataSourceWriteOptions.RECORD_MERGER_STRATEGY_ID.key)
Review Comment:
Change `HoodieTableConfig.RECORD_MERGER_STRATEGY` to
`HoodieTableConfig.RECORD_MERGER_STRATEGY_ID` without changing the config key?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]