yihua commented on code in PR #11943:
URL: https://github.com/apache/hudi/pull/11943#discussion_r1809674790
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java:
##########
@@ -111,8 +111,8 @@ public AbstractRealtimeRecordReader(RealtimeSplit split,
JobConf job) {
}
private boolean usesCustomPayload(HoodieTableMetaClient metaClient) {
- return
!(metaClient.getTableConfig().getPayloadClass().contains(HoodieAvroPayload.class.getName())
- ||
metaClient.getTableConfig().getPayloadClass().contains(OverwriteWithLatestAvroPayload.class.getName()));
+ return
metaClient.getTableConfig().getRecordMergeMode().equals(RecordMergeMode.CUSTOM)
+ &&
metaClient.getTableConfig().getRecordMergerStrategy().equals(HoodieRecordMerger.PAYLOAD_BASED_MERGER_STRATEGY_UUID);
Review Comment:
Does this mean that if the user wants to migrate from using the payload
class to the merger implementation class, the merger strategy ID needs to be
changed?
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java:
##########
@@ -157,8 +162,30 @@ public ArrayWritable convertAvroRecord(IndexedRecord
avroRecord) {
}
@Override
- public HoodieRecordMerger getRecordMerger(String mergerStrategy) {
- return HoodieHiveRecordMerger.getRecordMerger(mergerStrategy);
+ public GenericRecord convertToAvroRecord(ArrayWritable record, Schema
schema) {
+ return objectInspectorCache.serialize(record, schema);
+ }
+
+ @Override
+ public Option<HoodieRecordMerger> getRecordMerger(RecordMergeMode mergeMode,
String mergerStrategy, String mergerImpls) {
+ // TODO(HUDI-7843):
+ // get rid of event time and overwrite with latest. Just return
Option.empty
+ switch (mergeMode) {
+ case EVENT_TIME_ORDERING:
+ return Option.of(new DefaultHiveRecordMerger());
+ case OVERWRITE_WITH_LATEST:
+ return Option.of(new OverwriteWithLatestHiveRecordMerger());
+ case CUSTOM:
+ default:
+ if
(mergerStrategy.equals(HoodieRecordMerger.PAYLOAD_BASED_MERGER_STRATEGY_UUID)) {
+ return Option.of(HoodieAvroRecordMerger.INSTANCE);
+ }
+ Option<HoodieRecordMerger> returnVal =
HoodieRecordUtils.createValidRecordMerger(EngineType.JAVA, mergerImpls,
mergerStrategy);
Review Comment:
```suggestion
Option<HoodieRecordMerger> mergerClass =
HoodieRecordUtils.createValidRecordMerger(EngineType.JAVA, mergerImpls,
mergerStrategy);
```
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala:
##########
@@ -167,9 +166,21 @@ object HoodieWriterUtils {
if (!isOverWriteMode) {
val resolver = spark.sessionState.conf.resolver
val diffConfigs = StringBuilder.newBuilder
+ val payloadIsExpressionPayload =
params.getOrElse(PAYLOAD_CLASS_NAME.key(),
"").equals("org.apache.spark.sql.hudi.command.payload.ExpressionPayload")
Review Comment:
Define the immutable string for the payload class names.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -324,7 +323,9 @@ class HoodieSparkSqlWriterInternal {
.setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile)
.setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS))
.setCommitTimezone(timelineTimeZone)
-
.setRecordMergerStrategy(hoodieConfig.getStringOrDefault(DataSourceWriteOptions.RECORD_MERGER_STRATEGY))
+ .setPayloadClassName(payloadClass)
+ .setRecordMergerStrategy(recordMergerStrategy)
Review Comment:
```suggestion
.setRecordMergerStrategyId(recordMergerStrategyId)
```
##########
hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java:
##########
@@ -193,7 +194,8 @@ public static HoodieWriteConfig createHoodieConfig(String
schemaStr, String base
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withInlineCompaction(inlineCompact).build())
.withPayloadConfig(HoodiePayloadConfig.newBuilder()
-
.withPayloadClass(parameters.get(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key()))
+
.withPayloadClass(parameters.getOrDefault(DataSourceWriteOptions.PAYLOAD_CLASS_NAME().key(),
Review Comment:
Have you checked all such places that use `PAYLOAD_CLASS_NAME`, so they
should also get the payload class name from the table config?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala:
##########
@@ -167,9 +166,21 @@ object HoodieWriterUtils {
if (!isOverWriteMode) {
val resolver = spark.sessionState.conf.resolver
val diffConfigs = StringBuilder.newBuilder
+ val payloadIsExpressionPayload =
params.getOrElse(PAYLOAD_CLASS_NAME.key(),
"").equals("org.apache.spark.sql.hudi.command.payload.ExpressionPayload")
params.foreach { case (key, value) =>
+ var ignoreConfig = false
// Base file format can change between writes, so ignore it.
- if (!HoodieTableConfig.BASE_FILE_FORMAT.key.equals(key)) {
+ ignoreConfig = ignoreConfig ||
HoodieTableConfig.BASE_FILE_FORMAT.key.equals(key)
+
+ //expression payload will never be the table config so skip validation
of merge configs
+ ignoreConfig = ignoreConfig || (payloadIsExpressionPayload &&
(key.equals(PAYLOAD_CLASS_NAME.key())
+ || key.equals(HoodieTableConfig.PAYLOAD_CLASS_NAME.key()) ||
key.equals(RECORD_MERGE_MODE.key())
+ || key.equals(RECORD_MERGER_STRATEGY_ID.key())))
+
+ //don't validate the payload only in the case that insert into is
using fallback to some legacy configs
+ ignoreConfig = ignoreConfig || (key.equals(PAYLOAD_CLASS_NAME.key())
&&
value.equals("org.apache.spark.sql.hudi.command.ValidateDuplicateKeyPayload"))
Review Comment:
Same here.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -400,7 +398,10 @@ object DataSourceWriteOptions {
/**
* Id of merger strategy
*/
- val RECORD_MERGER_STRATEGY = HoodieWriteConfig.RECORD_MERGER_STRATEGY
+ val RECORD_MERGER_STRATEGY_ID = HoodieWriteConfig.RECORD_MERGER_STRATEGY_ID
+
+
Review Comment:
remove empty line
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]