yihua commented on code in PR #11943:
URL: https://github.com/apache/hudi/pull/11943#discussion_r1799933278


##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java:
##########
@@ -105,6 +106,8 @@ protected byte[] serializeRecords(List<HoodieRecord> 
records, HoodieStorage stor
     Schema schema = new 
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
     GenericDatumWriter<IndexedRecord> writer = new 
GenericDatumWriter<>(schema);
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    Function<HoodieRecord<?>, IndexedRecord> recordConverter = 
HoodieIOFactory.getIOFactory(storage)
+        .toIndexedRecord(schema, new Properties(), 
records.get(0).getRecordType());

Review Comment:
   Could we separate this out to a different PR assuming that the write path 
still uses Avro based merger?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java:
##########
@@ -143,10 +146,9 @@ protected byte[] serializeRecords(List<HoodieRecord> 
records, HoodieStorage stor
   @Override
   protected <T> ClosableIterator<HoodieRecord<T>> deserializeRecords(byte[] 
content, HoodieRecordType type) throws IOException {
     checkState(this.readerSchema != null, "Reader's schema has to be 
non-null");
-    checkArgument(type != HoodieRecordType.SPARK, "Not support read avro to 
spark record");
-    // TODO AvroSparkReader need
     RecordIterator iterator = RecordIterator.getInstance(this, content);
-    return new CloseableMappingIterator<>(iterator, data -> (HoodieRecord<T>) 
new HoodieAvroIndexedRecord(data));
+    Function<IndexedRecord, HoodieRecord<?>> recordConverter = 
HoodieIOFactory.getIOFactory(getBlockContentLocation().get().getStorage()).fromIndexedRecord(type);
+    return new CloseableMappingIterator<>(iterator, data -> (HoodieRecord<T>) 
recordConverter.apply(data));

Review Comment:
   Same here.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java:
##########
@@ -391,22 +425,70 @@ protected Option<T> merge(Option<T> older, Map<String, 
Object> olderInfoMap,
           return newer;
         case CUSTOM:
         default:
-          Option<Pair<HoodieRecord, Schema>> mergedRecord = recordMerger.merge(
-              readerContext.constructHoodieRecord(older, olderInfoMap), 
(Schema) olderInfoMap.get(INTERNAL_META_SCHEMA),
-              readerContext.constructHoodieRecord(newer, newerInfoMap), 
(Schema) newerInfoMap.get(INTERNAL_META_SCHEMA), props);
-
-          if (mergedRecord.isPresent()
-              && 
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
-            if (!mergedRecord.get().getRight().equals(readerSchema)) {
-              return Option.ofNullable((T) 
mergedRecord.get().getLeft().rewriteRecordWithNewSchema(mergedRecord.get().getRight(),
 null, readerSchema).getData());
+
+          if (payloadClass.isPresent()) {
+            ValidationUtils.checkArgument(!Objects.equals(payloadClass, 
OverwriteWithLatestAvroPayload.class.getCanonicalName())
+                && !Objects.equals(payloadClass, 
DefaultHoodieRecordPayload.class.getCanonicalName()));
+            HoodieRecord oldHoodieRecord = 
constructHoodieAvroRecord(readerContext, older, olderInfoMap);
+            HoodieRecord newHoodieRecord = 
constructHoodieAvroRecord(readerContext, newer, newerInfoMap);
+            Option<Pair<HoodieRecord, Schema>> mergedRecord = 
recordMerger.get().merge(
+                oldHoodieRecord, getSchemaForAvroPayloadMerge(oldHoodieRecord, 
olderInfoMap),
+                newHoodieRecord, getSchemaForAvroPayloadMerge(newHoodieRecord, 
newerInfoMap), props);
+            if (mergedRecord.isPresent()
+                && 
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
+              if (!mergedRecord.get().getRight().equals(readerSchema)) {
+                return Option.ofNullable(
+                    readerContext.convertAvroRecord((IndexedRecord) 
mergedRecord.get().getLeft().rewriteRecordWithNewSchema(mergedRecord.get().getRight(),
 null, readerSchema).getData()));
+              }
+              return 
Option.ofNullable(readerContext.convertAvroRecord((IndexedRecord)  
mergedRecord.get().getLeft().getData()));
+            }
+          } else {
+            Option<Pair<HoodieRecord, Schema>> mergedRecord = 
recordMerger.get().merge(
+                readerContext.constructHoodieRecord(older, olderInfoMap), 
(Schema) olderInfoMap.get(INTERNAL_META_SCHEMA),
+                readerContext.constructHoodieRecord(newer, newerInfoMap), 
(Schema) newerInfoMap.get(INTERNAL_META_SCHEMA), props);
+            if (mergedRecord.isPresent()
+                && 
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
+              if (!mergedRecord.get().getRight().equals(readerSchema)) {
+                return Option.ofNullable((T) 
mergedRecord.get().getLeft().rewriteRecordWithNewSchema(mergedRecord.get().getRight(),
 null, readerSchema).getData());
+              }
+              return Option.ofNullable((T) 
mergedRecord.get().getLeft().getData());
             }
-            return Option.ofNullable((T) 
mergedRecord.get().getLeft().getData());
           }
           return Option.empty();
       }
     }
   }
 
+  /**
+   * Constructs a new {@link HoodieAvroRecord} for payload based merging
+   *
+   * @param readerContext reader context
+   * @param recordOption An option of the record in engine-specific type if 
exists.
+   * @param metadataMap  The record metadata.
+   * @return A new instance of {@link HoodieRecord}.
+   */
+  private HoodieRecord constructHoodieAvroRecord(HoodieReaderContext<T> 
readerContext, Option<T> recordOption, Map<String, Object> metadataMap) {

Review Comment:
   We need to rethink the support on Avro merger here.  This new code path adds 
more complexity.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodiePositionBasedFileGroupRecordBuffer.java:
##########
@@ -68,11 +68,11 @@ public class HoodiePositionBasedFileGroupRecordBuffer<T> 
extends HoodieKeyBasedF
 
   public HoodiePositionBasedFileGroupRecordBuffer(HoodieReaderContext<T> 
readerContext,
                                                   HoodieTableMetaClient 
hoodieTableMetaClient,
+                                                  RecordMergeMode 
recordMergeMode,

Review Comment:
   Revert the unnecessary changes that does not affect functionality?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java:
##########
@@ -84,23 +96,24 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
 
   public HoodieBaseFileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
                                          HoodieTableMetaClient 
hoodieTableMetaClient,
+                                         RecordMergeMode recordMergeMode,
                                          Option<String> 
partitionNameOverrideOpt,
                                          Option<String[]> 
partitionPathFieldOpt,
-                                         HoodieRecordMerger recordMerger,
                                          TypedProperties props) {
     this.readerContext = readerContext;
     this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema();
     this.partitionNameOverrideOpt = partitionNameOverrideOpt;
     this.partitionPathFieldOpt = partitionPathFieldOpt;
-    this.recordMergeMode = getRecordMergeMode(props);
-    this.recordMerger = recordMerger;
+    this.recordMergeMode = recordMergeMode;
+    this.recordMerger = readerContext.getRecordMerger();
+    if (recordMerger.isPresent() && 
recordMerger.get().getMergingStrategy().equals(PAYLOAD_BASED_MERGER_STRATEGY_UUID))
 {
+      this.payloadClass = Option.of(ConfigUtils.getAvroPayloadClass(props));
+    } else {
+      this.payloadClass = Option.empty();
+    }

Review Comment:
   Better to hide all these details in `RecordMergeConfig` to be consistent.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java:
##########
@@ -179,7 +183,11 @@ public enum HoodieLogBlockType {
     }
 
     public static HoodieLogBlockType fromId(String id) {
-      return ID_TO_ENUM_MAP.get(id);
+      return ID_TO_ENUM_MAP.get(id.toLowerCase());
+    }
+
+    public String getId() {
+      return id;

Review Comment:
   Is this change necessary?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java:
##########
@@ -84,23 +96,24 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
 
   public HoodieBaseFileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
                                          HoodieTableMetaClient 
hoodieTableMetaClient,
+                                         RecordMergeMode recordMergeMode,
                                          Option<String> 
partitionNameOverrideOpt,
                                          Option<String[]> 
partitionPathFieldOpt,
-                                         HoodieRecordMerger recordMerger,
                                          TypedProperties props) {
     this.readerContext = readerContext;
     this.readerSchema = readerContext.getSchemaHandler().getRequiredSchema();
     this.partitionNameOverrideOpt = partitionNameOverrideOpt;
     this.partitionPathFieldOpt = partitionPathFieldOpt;
-    this.recordMergeMode = getRecordMergeMode(props);
-    this.recordMerger = recordMerger;
+    this.recordMergeMode = recordMergeMode;
+    this.recordMerger = readerContext.getRecordMerger();
+    if (recordMerger.isPresent() && 
recordMerger.get().getMergingStrategy().equals(PAYLOAD_BASED_MERGER_STRATEGY_UUID))
 {
+      this.payloadClass = Option.of(ConfigUtils.getAvroPayloadClass(props));
+    } else {
+      this.payloadClass = Option.empty();
+    }
     this.orderingFieldName = 
Option.ofNullable(ConfigUtils.getOrderingField(props)).orElseGet(() -> 
hoodieTableMetaClient.getTableConfig().getPreCombineField());
-    this.orderingFieldType = AvroSchemaUtils.findNestedFieldType(readerSchema, 
this.orderingFieldName).orElse(Schema.Type.INT);
+    this.orderingFieldType = AvroSchemaUtils.findNestedFieldType(readerSchema, 
this.orderingFieldName).orElse(Schema.Type.STRING);

Review Comment:
   There should not be a default ordering field type, to avoid error.  Also, 
the "(int) 0" is a special delete ordering which means such deletes should take 
higher priority by using processing time.  We can revisit the logic in a 
follow-up.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieUnmergedFileGroupRecordBuffer.java:
##########
@@ -49,11 +49,11 @@ public class HoodieUnmergedFileGroupRecordBuffer<T> extends 
HoodieBaseFileGroupR
   public HoodieUnmergedFileGroupRecordBuffer(
       HoodieReaderContext<T> readerContext,
       HoodieTableMetaClient hoodieTableMetaClient,
+      RecordMergeMode recordMergeMode,
       Option<String> partitionNameOverrideOpt,
       Option<String[]> partitionPathFieldOpt,
-      HoodieRecordMerger recordMerger,
       TypedProperties props) {
-    super(readerContext, hoodieTableMetaClient, partitionNameOverrideOpt, 
partitionPathFieldOpt, recordMerger, props);
+    super(readerContext, hoodieTableMetaClient, recordMergeMode, 
partitionNameOverrideOpt, partitionPathFieldOpt, props);

Review Comment:
   Same here and other places to make PR easier to review.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to