the-other-tim-brown commented on code in PR #13742:
URL: https://github.com/apache/hudi/pull/13742#discussion_r2308940260


##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -449,95 +383,36 @@ protected Option<Pair<HoodieRecord, Schema>> 
getMergedRecord(BufferedRecord<T> o
    * based on {@code CUSTOM} merge mode and a given record payload class.
    */
   private static class CustomPayloadRecordMerger<T> extends 
BaseCustomMerger<T> {
-    private final String[] orderingFieldNames;
     protected final String payloadClass;
 
     public CustomPayloadRecordMerger(
         RecordContext<T> recordContext,
         Option<HoodieRecordMerger> recordMerger,
-        List<String> orderingFieldNames,
         String payloadClass,
         Schema readerSchema,
         TypedProperties props) {
       super(recordContext, recordMerger, readerSchema, props);
-      this.orderingFieldNames = orderingFieldNames.toArray(new String[0]);
       this.payloadClass = payloadClass;
+      props.setProperty(HoodieAvroRecordMerger.PAYLOAD_CLASS_PROP, 
payloadClass);
     }
 
     @Override
-    public Option<BufferedRecord<T>> 
deltaMergeNonDeleteRecord(BufferedRecord<T> newRecord, BufferedRecord<T> 
existingRecord) throws IOException {
-      Option<Pair<HoodieRecord, Schema>> mergedRecordAndSchema = 
getMergedRecord(existingRecord, newRecord, false);
-      if (mergedRecordAndSchema.isEmpty()) {
-        // An empty Option indicates that the output represents a delete.
-        return Option.of(new BufferedRecord<>(newRecord.getRecordKey(), 
OrderingValues.getDefault(), null, null, HoodieOperation.DELETE));
-      }
-      HoodieRecord mergedRecord = mergedRecordAndSchema.get().getLeft();
-      Schema mergeResultSchema = mergedRecordAndSchema.get().getRight();
-      // Special handling for SENTINEL record in Expression Payload. This is 
returned if the condition does not match.
-      if (mergedRecord.getData() == HoodieRecord.SENTINEL) {
-        return Option.empty();
-      }
-      T combinedRecordData = 
recordContext.convertAvroRecord(mergedRecord.toIndexedRecord(mergeResultSchema, 
props).get().getData());
+    public Option<BufferedRecord<T>> deltaMergeRecords(BufferedRecord<T> 
newRecord, BufferedRecord<T> existingRecord) throws IOException {
+      BufferedRecord<T> mergedRecord = getMergedRecord(existingRecord, 
newRecord, false);
       // If pre-combine does not return existing record, update it
-      if (combinedRecordData != existingRecord.getRecord()) {
-        // For pkless we need to use record key from existing record
-        return Option.of(BufferedRecords.fromEngineRecord(combinedRecordData, 
mergeResultSchema, recordContext, orderingFieldNames,
-            existingRecord.getRecordKey(), 
mergedRecord.isDelete(mergeResultSchema, props)));
+      if (mergedRecord.getRecord() != existingRecord.getRecord()) {
+        return Option.of(mergedRecord);
       }
       return Option.empty();
     }
 
     @Override
-    public BufferedRecord<T> mergeNonDeleteRecord(BufferedRecord<T> 
olderRecord, BufferedRecord<T> newerRecord) throws IOException {
-      Option<Pair<HoodieRecord, Schema>> mergedRecordAndSchema = 
getMergedRecord(olderRecord, newerRecord, true);
-      if (mergedRecordAndSchema.isEmpty()) {
-        return BufferedRecords.createDelete(newerRecord.getRecordKey());
-      }
-      HoodieRecord mergedRecord = mergedRecordAndSchema.get().getLeft();
-      Schema mergeResultSchema = mergedRecordAndSchema.get().getRight();
-      // Special handling for SENTINEL record in Expression Payload
-      if (mergedRecord.getData() == HoodieRecord.SENTINEL) {
-        return olderRecord;
-      }
-      if (!mergedRecord.isDelete(mergeResultSchema, props)) {
-        IndexedRecord indexedRecord = (IndexedRecord) mergedRecord.getData();
-        return BufferedRecords.fromEngineRecord(
-            recordContext.convertAvroRecord(indexedRecord), mergeResultSchema, 
recordContext, orderingFieldNames, newerRecord.getRecordKey(), false);
-      }
-      return BufferedRecords.createDelete(newerRecord.getRecordKey());
+    public BufferedRecord<T> mergeRecords(BufferedRecord<T> olderRecord, 
BufferedRecord<T> newerRecord) throws IOException {
+      return getMergedRecord(olderRecord, newerRecord, true);
     }
 
-    protected Pair<HoodieRecord, HoodieRecord> 
getDeltaMergeRecords(BufferedRecord<T> olderRecord, BufferedRecord<T> 
newerRecord) {
-      HoodieRecord oldHoodieRecord = constructHoodieAvroRecord(recordContext, 
olderRecord, payloadClass);
-      HoodieRecord newHoodieRecord = constructHoodieAvroRecord(recordContext, 
newerRecord, payloadClass);
-      return Pair.of(oldHoodieRecord, newHoodieRecord);
-    }
-
-    protected Pair<HoodieRecord, HoodieRecord> 
getFinalMergeRecords(BufferedRecord<T> olderRecord, BufferedRecord<T> 
newerRecord) {
-      return getDeltaMergeRecords(olderRecord, newerRecord);
-    }
-
-    protected Option<Pair<HoodieRecord, Schema>> 
getMergedRecord(BufferedRecord<T> olderRecord, BufferedRecord<T> newerRecord, 
boolean isFinalMerge) throws IOException {
-      Pair<HoodieRecord, HoodieRecord> records = isFinalMerge ? 
getFinalMergeRecords(olderRecord, newerRecord) : 
getDeltaMergeRecords(olderRecord, newerRecord);
-      return recordMerger.merge(records.getLeft(), 
getSchemaForAvroPayloadMerge(olderRecord), records.getRight(), 
getSchemaForAvroPayloadMerge(newerRecord), props);
-    }
-
-    protected HoodieRecord constructHoodieAvroRecord(RecordContext<T> 
recordContext, BufferedRecord<T> bufferedRecord, String payloadClass) {
-      GenericRecord record = null;
-      if (!bufferedRecord.isDelete()) {
-        Schema recordSchema = 
recordContext.getSchemaFromBufferRecord(bufferedRecord);
-        record = recordContext.convertToAvroRecord(bufferedRecord.getRecord(), 
recordSchema);
-      }
-      HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(), null);
-      return new HoodieAvroRecord<>(hoodieKey,
-          HoodieRecordUtils.loadPayload(payloadClass, record, 
bufferedRecord.getOrderingValue()), null);
-    }
-
-    protected Schema getSchemaForAvroPayloadMerge(BufferedRecord<T> 
bufferedRecord) {
-      if (bufferedRecord.getSchemaId() == null) {
-        return readerSchema;
-      }
-      return recordContext.getSchemaFromBufferRecord(bufferedRecord);
+    protected BufferedRecord<T> getMergedRecord(BufferedRecord<T> olderRecord, 
BufferedRecord<T> newerRecord, boolean isFinalMerge) throws IOException {

Review Comment:
   Cleaned this up along with some more code now that the payload/custom merger 
paths are more similar



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/HoodieSparkRecordMerger.java:
##########
@@ -39,33 +41,33 @@ public HoodieRecord.HoodieRecordType getRecordType() {
    * Basic handling of deletes that is used by many of the spark mergers
    * returns null if merger specific logic should be used
    */
-  protected Option<Pair<HoodieRecord, Schema>> handleDeletes(HoodieRecord 
older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties 
props) {
+  protected Pair<HoodieRecord, Schema> handleDeletes(HoodieRecord older, 
Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) {

Review Comment:
   Removed this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to