the-other-tim-brown commented on code in PR #13242:
URL: https://github.com/apache/hudi/pull/13242#discussion_r2105690360


##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -410,160 +282,29 @@ protected Option<Pair<Function<T, T>, Schema>> 
composeEvolvedSchemaTransformer(
   }
 
   /**
-   * Merge two records using the configured record merger.
+   * Merge record from base file and log file using the configured merger.
    *
-   * @param olderRecord  old {@link BufferedRecord}
-   * @param newerRecord  newer {@link BufferedRecord}
-   * @return a value pair, left is boolean value `isDelete`, and right is 
engine row.
+   * @param baseRecord  old {@link BufferedRecord} from the base file
+   * @param logRecord  newer {@link BufferedRecord} from the log file, may be 
null
+   * @return a {@link BufferedRecord<T>} for the result.
    * @throws IOException
    */
-  protected Pair<Boolean, T> merge(BufferedRecord<T> olderRecord, 
BufferedRecord<T> newerRecord) throws IOException {
-    if (enablePartialMerging) {
-      // TODO(HUDI-7843): decouple the merging logic from the merger
-      //  and use the record merge mode to control how to merge partial updates
-      Option<Pair<HoodieRecord, Schema>> mergedRecord = 
recordMerger.get().partialMerge(
-          readerContext.constructHoodieRecord(olderRecord), 
readerContext.getSchemaFromBufferRecord(olderRecord),
-          readerContext.constructHoodieRecord(newerRecord), 
readerContext.getSchemaFromBufferRecord(newerRecord),
-          readerSchema, props);
-
-      if (mergedRecord.isPresent()
-          && 
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
-        HoodieRecord hoodieRecord = mergedRecord.get().getLeft();
-        if (!mergedRecord.get().getRight().equals(readerSchema)) {
-          T data = (T) 
hoodieRecord.rewriteRecordWithNewSchema(mergedRecord.get().getRight(), null, 
readerSchema).getData();
-          return Pair.of(false, data);
-        }
-        return Pair.of(false, (T) hoodieRecord.getData());
-      }
-      return Pair.of(true, null);
-    } else {
-      switch (recordMergeMode) {
-        case COMMIT_TIME_ORDERING:
-          return Pair.of(newerRecord.isDelete(), newerRecord.getRecord());
-        case EVENT_TIME_ORDERING:
-          if (newerRecord.isCommitTimeOrderingDelete()) {
-            return Pair.of(true, newerRecord.getRecord());
-          }
-          Comparable newOrderingValue = newerRecord.getOrderingValue();
-          Comparable oldOrderingValue = olderRecord.getOrderingValue();
-          if (!olderRecord.isCommitTimeOrderingDelete()
-              && oldOrderingValue.compareTo(newOrderingValue) > 0) {
-            return Pair.of(olderRecord.isDelete(), olderRecord.getRecord());
-          }
-          return Pair.of(newerRecord.isDelete(), newerRecord.getRecord());
-        case CUSTOM:
-        default:
-          if (payloadClass.isPresent()) {
-            if (olderRecord.isDelete() || newerRecord.isDelete()) {
-              if (shouldKeepNewerRecord(olderRecord, newerRecord)) {
-                // IMPORTANT:
-                // this is needed when the fallback HoodieAvroRecordMerger got 
used, the merger would
-                // return Option.empty when the new payload data is empty(a 
delete) and ignores its ordering value directly.
-                return Pair.of(newerRecord.isDelete(), 
newerRecord.getRecord());
-              } else {
-                return Pair.of(olderRecord.isDelete(), 
olderRecord.getRecord());
-              }
-            }
-            Option<Pair<HoodieRecord, Schema>> mergedRecord =
-                getMergedRecord(olderRecord, newerRecord);
-            if (mergedRecord.isPresent()
-                && 
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
-              IndexedRecord indexedRecord;
-              if (!mergedRecord.get().getRight().equals(readerSchema)) {
-                indexedRecord = (IndexedRecord) 
mergedRecord.get().getLeft().rewriteRecordWithNewSchema(mergedRecord.get().getRight(),
 null, readerSchema).getData();
-              } else {
-                indexedRecord = (IndexedRecord) 
mergedRecord.get().getLeft().getData();
-              }
-              return Pair.of(false, 
readerContext.convertAvroRecord(indexedRecord));
-            }
-            return Pair.of(true, null);
-          } else {
-            if (olderRecord.isDelete() || newerRecord.isDelete()) {
-              if (shouldKeepNewerRecord(olderRecord, newerRecord)) {
-                // IMPORTANT:
-                // this is needed when the fallback HoodieAvroRecordMerger got 
used, the merger would
-                // return Option.empty when the new payload data is empty(a 
delete) and ignores its ordering value directly.
-                return Pair.of(newerRecord.isDelete(), 
newerRecord.getRecord());
-              } else {
-                return Pair.of(olderRecord.isDelete(), 
olderRecord.getRecord());
-              }
-            }
-            Option<Pair<HoodieRecord, Schema>> mergedRecord = 
recordMerger.get().merge(
-                readerContext.constructHoodieRecord(olderRecord), 
readerContext.getSchemaFromBufferRecord(olderRecord),
-                readerContext.constructHoodieRecord(newerRecord), 
readerContext.getSchemaFromBufferRecord(newerRecord), props);
-            if (mergedRecord.isPresent()
-                && 
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
-              HoodieRecord hoodieRecord = mergedRecord.get().getLeft();
-              if (!mergedRecord.get().getRight().equals(readerSchema)) {
-                return Pair.of(false, (T) 
hoodieRecord.rewriteRecordWithNewSchema(mergedRecord.get().getRight(), null, 
readerSchema).getData());
-              }
-              return Pair.of(false, (T) hoodieRecord.getData());
-            }
-            return Pair.of(true, null);
-          }
-      }
-    }
-  }
-
-  /**
-   * Decides whether to keep the incoming record with ordering value 
comparison.
-   */
-  private boolean shouldKeepNewerRecord(BufferedRecord<T> oldRecord, 
BufferedRecord<T> newRecord) {
-    if (newRecord.isCommitTimeOrderingDelete()) {
-      // handle records coming from DELETE statements(the orderingVal is 
constant 0)
-      return true;
-    }
-    return 
newRecord.getOrderingValue().compareTo(oldRecord.getOrderingValue()) >= 0;
-  }
-
-  private Option<Pair<HoodieRecord, Schema>> getMergedRecord(BufferedRecord<T> 
olderRecord, BufferedRecord<T> newerRecord) throws IOException {
-    ValidationUtils.checkArgument(!Objects.equals(payloadClass, 
OverwriteWithLatestAvroPayload.class.getCanonicalName())
-        && !Objects.equals(payloadClass, 
DefaultHoodieRecordPayload.class.getCanonicalName()));
-    HoodieRecord oldHoodieRecord = constructHoodieAvroRecord(readerContext, 
olderRecord);
-    HoodieRecord newHoodieRecord = constructHoodieAvroRecord(readerContext, 
newerRecord);
-    Option<Pair<HoodieRecord, Schema>> mergedRecord = recordMerger.get().merge(
-        oldHoodieRecord, getSchemaForAvroPayloadMerge(oldHoodieRecord, 
olderRecord),
-        newHoodieRecord, getSchemaForAvroPayloadMerge(newHoodieRecord, 
newerRecord), props);
-    return mergedRecord;
-  }
-
-  /**
-   * Constructs a new {@link HoodieAvroRecord} for payload based merging
-   *
-   * @param readerContext reader context
-   * @param bufferedRecord buffered record
-   * @return A new instance of {@link HoodieRecord}.
-   */
-  private HoodieRecord constructHoodieAvroRecord(HoodieReaderContext<T> 
readerContext, BufferedRecord<T> bufferedRecord) {
-    GenericRecord record = null;
-    if (!bufferedRecord.isDelete()) {
-      Schema recordSchema = 
readerContext.getSchemaFromBufferRecord(bufferedRecord);
-      record = readerContext.convertToAvroRecord(bufferedRecord.getRecord(), 
recordSchema);
-    }
-    HoodieKey hoodieKey = new HoodieKey(bufferedRecord.getRecordKey(), null);
-    return new HoodieAvroRecord<>(hoodieKey,
-        HoodieRecordUtils.loadPayload(payloadClass.get(), record, 
bufferedRecord.getOrderingValue()), null);
-  }
-
-  private Schema getSchemaForAvroPayloadMerge(HoodieRecord record, 
BufferedRecord<T> bufferedRecord) throws IOException {
-    if (record.isDelete(readerSchema, props)) {
-      return readerSchema;
-    }
-    return readerContext.getSchemaFromBufferRecord(bufferedRecord);
+  private BufferedRecord<T> merge(BufferedRecord<T> baseRecord, 
BufferedRecord<T> logRecord) throws IOException {

Review Comment:
   This signature changes since creating a Pair per record is unnecessary 
overhead. We already have the BufferedRecord which will give us the context of 
whether or not it is a delete. The pair also uses a `Boolean` so you have the 
autoboxing overhead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to