danny0405 commented on code in PR #13600:
URL: https://github.com/apache/hudi/pull/13600#discussion_r2251059110


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -331,7 +338,7 @@ public static HoodieIndex 
createUserDefinedIndex(HoodieWriteConfig config) {
    * @return {@link HoodieRecord}s that have the current location being set.
    */
   private static <R> HoodieData<HoodieRecord<R>> getExistingRecords(
-      HoodieData<Pair<String, String>> partitionLocations, HoodieWriteConfig 
config, HoodieTable hoodieTable) {
+      HoodieData<Pair<String, String>> partitionLocations, HoodieWriteConfig 
config, HoodieTable hoodieTable, ReaderContextFactory<R> readerContextFactory, 
Schema dataSchema) {

Review Comment:
   Did we intern the schema already?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -493,10 +513,38 @@ public static <R> HoodieData<HoodieRecord<R>> 
mergeForPartitionUpdatesIfNeeded(
         .filter(p -> p.getRight().isPresent())
         .map(p -> Pair.of(p.getRight().get().getPartitionPath(), 
p.getRight().get().getFileId()))
         .distinct(updatedConfig.getGlobalIndexReconcileParallelism());
+    // define the buffered record merger.
+    ReaderContextFactory<R> readerContextFactory = (ReaderContextFactory<R>) 
hoodieTable.getContext()
+        .<R>getReaderContextFactoryForWrite(hoodieTable.getMetaClient(), 
config.getRecordMerger().getRecordType(), config.getProps());
+    HoodieReaderContext<R> readerContext = readerContextFactory.getContext();
+    RecordContext<R> incomingRecordContext = readerContext.getRecordContext();
+    readerContext.initRecordMerger(config.getProps());
+    // Create a reader context for the existing records. In the case of 
merge-into commands, the incoming records
+    // can be using an expression payload so here we rely on the table's 
configured payload class if it is required.
+    ReaderContextFactory<R> readerContextFactoryForExistingRecords = 
(ReaderContextFactory<R>) hoodieTable.getContext()
+        .<R>getReaderContextFactoryForWrite(hoodieTable.getMetaClient(), 
config.getRecordMerger().getRecordType(), 
hoodieTable.getMetaClient().getTableConfig().getProps());
+    RecordContext<R> existingRecordContext = 
readerContextFactoryForExistingRecords.getContext().getRecordContext();
     // merged existing records with current locations being set
-    HoodieData<HoodieRecord<R>> existingRecords = 
getExistingRecords(globalLocations, keyGeneratorWriteConfigOpt.getLeft(), 
hoodieTable);
-
-    final HoodieRecordMerger recordMerger = updatedConfig.getRecordMerger();
+    SerializableSchema writerSchema = new 
SerializableSchema(hoodieTable.getConfig().getWriteSchema());

Review Comment:
   why we need a `SerializableSchema` here? isn't the avro `Schema` already 
serializable?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -493,10 +513,38 @@ public static <R> HoodieData<HoodieRecord<R>> 
mergeForPartitionUpdatesIfNeeded(
         .filter(p -> p.getRight().isPresent())
         .map(p -> Pair.of(p.getRight().get().getPartitionPath(), 
p.getRight().get().getFileId()))
         .distinct(updatedConfig.getGlobalIndexReconcileParallelism());
+    // define the buffered record merger.
+    ReaderContextFactory<R> readerContextFactory = (ReaderContextFactory<R>) 
hoodieTable.getContext()
+        .<R>getReaderContextFactoryForWrite(hoodieTable.getMetaClient(), 
config.getRecordMerger().getRecordType(), config.getProps());
+    HoodieReaderContext<R> readerContext = readerContextFactory.getContext();
+    RecordContext<R> incomingRecordContext = readerContext.getRecordContext();
+    readerContext.initRecordMerger(config.getProps());
+    // Create a reader context for the existing records. In the case of 
merge-into commands, the incoming records
+    // can be using an expression payload so here we rely on the table's 
configured payload class if it is required.
+    ReaderContextFactory<R> readerContextFactoryForExistingRecords = 
(ReaderContextFactory<R>) hoodieTable.getContext()
+        .<R>getReaderContextFactoryForWrite(hoodieTable.getMetaClient(), 
config.getRecordMerger().getRecordType(), 
hoodieTable.getMetaClient().getTableConfig().getProps());
+    RecordContext<R> existingRecordContext = 
readerContextFactoryForExistingRecords.getContext().getRecordContext();
     // merged existing records with current locations being set
-    HoodieData<HoodieRecord<R>> existingRecords = 
getExistingRecords(globalLocations, keyGeneratorWriteConfigOpt.getLeft(), 
hoodieTable);
-
-    final HoodieRecordMerger recordMerger = updatedConfig.getRecordMerger();
+    SerializableSchema writerSchema = new 
SerializableSchema(hoodieTable.getConfig().getWriteSchema());
+    SerializableSchema writerSchemaWithMetaFields = new 
SerializableSchema(HoodieAvroUtils.addMetadataFields(writerSchema.get(), 
updatedConfig.allowOperationMetadataField()));
+    // Read the existing records with the meta fields and current writer 
schema as the output schema
+    HoodieData<HoodieRecord<R>> existingRecords =
+        getExistingRecords(globalLocations, 
keyGeneratorWriteConfigOpt.getLeft(), hoodieTable, 
readerContextFactoryForExistingRecords, writerSchemaWithMetaFields.get());
+    List<String> orderingFieldNames = getOrderingFieldNames(
+        readerContext.getMergeMode(), hoodieTable.getConfig().getProps(), 
hoodieTable.getMetaClient());
+    RecordMergeMode recordMergeMode = 
HoodieTableConfig.inferCorrectMergingBehavior(null, config.getPayloadClass(), 
null,
+        String.join(",", orderingFieldNames), 
hoodieTable.getMetaClient().getTableConfig().getTableVersion()).getLeft();
+    BufferedRecordMerger<R> recordMerger = BufferedRecordMergerFactory.create(
+        readerContext,
+        recordMergeMode,
+        false,
+        Option.ofNullable(updatedConfig.getRecordMerger()),

Review Comment:
   always use `HoodieReaderContext.getRecordMerger`



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java:
##########
@@ -85,9 +102,70 @@ public I combineOnCondition(
    * @return Collection of HoodieRecord already be deduplicated
    */
   public I deduplicateRecords(I records, HoodieTable<T, I, K, O> table, int 
parallelism) {
+    HoodieReaderContext<T> readerContext =
+        (HoodieReaderContext<T>) 
table.getContext().<T>getReaderContextFactoryForWrite(table.getMetaClient(), 
table.getConfig().getRecordMerger().getRecordType(), 
table.getConfig().getProps())
+            .getContext();
+    HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
+    readerContext.initRecordMerger(table.getConfig().getProps());
+    List<String> orderingFieldNames = 
HoodieRecordUtils.getOrderingFieldNames(readerContext.getMergeMode(), 
table.getConfig().getProps(), table.getMetaClient());
     HoodieRecordMerger recordMerger = 
HoodieRecordUtils.mergerToPreCombineMode(table.getConfig().getRecordMerger());
-    return deduplicateRecords(records, table.getIndex(), parallelism, 
table.getConfig().getSchema(), table.getConfig().getProps(), recordMerger);
+    RecordMergeMode recordMergeMode = 
HoodieTableConfig.inferCorrectMergingBehavior(null, 
table.getConfig().getPayloadClass(), null,
+        String.join(",", orderingFieldNames), 
tableConfig.getTableVersion()).getLeft();
+    Schema recordSchema;
+    if (StringUtils.nonEmpty(table.getConfig().getPartialUpdateSchema())) {
+      recordSchema = new 
Schema.Parser().parse(table.getConfig().getPartialUpdateSchema());
+    } else {
+      recordSchema = new 
Schema.Parser().parse(table.getConfig().getWriteSchema());
+    }
+    BufferedRecordMerger<T> bufferedRecordMerger = 
BufferedRecordMergerFactory.create(
+        readerContext,
+        recordMergeMode,
+        false,
+        Option.ofNullable(recordMerger),
+        orderingFieldNames,
+        Option.ofNullable(table.getConfig().getPayloadClass()),
+        recordSchema,
+        table.getConfig().getProps(),
+        tableConfig.getPartialUpdateMode());
+    return deduplicateRecords(
+        records,
+        table.getIndex(),
+        parallelism,
+        table.getConfig().getSchema(),
+        table.getConfig().getProps(),
+        bufferedRecordMerger,
+        readerContext,
+        orderingFieldNames.toArray(new String[0]));
   }
 
-  public abstract I deduplicateRecords(I records, HoodieIndex<?, ?> index, int 
parallelism, String schema, TypedProperties props, HoodieRecordMerger merger);
+  public abstract I deduplicateRecords(I records,
+                                       HoodieIndex<?, ?> index,
+                                       int parallelism,
+                                       String schema,
+                                       TypedProperties props,
+                                       BufferedRecordMerger<T> merger,
+                                       HoodieReaderContext<T> readerContext,
+                                       String[] orderingFieldNames);
+
+  protected static <T> HoodieRecord<T> reduceRecords(TypedProperties props, 
BufferedRecordMerger<T> recordMerger, String[] orderingFieldNames,
+                                                     HoodieRecord<T> previous, 
HoodieRecord<T> next, Schema schema, RecordContext<T> recordContext) {
+    try {
+      // NOTE: The order of previous and next is uncertain within a batch in 
"reduceByKey".
+      // If the return value is empty, it means the previous should be chosen.
+      BufferedRecord<T> newBufferedRecord = 
BufferedRecord.forRecordWithContext(next, schema, recordContext, props, 
orderingFieldNames);
+      // Construct old buffered record.
+      BufferedRecord<T> oldBufferedRecord = 
BufferedRecord.forRecordWithContext(previous, schema, recordContext, props, 
orderingFieldNames);
+      // Run merge.
+      Option<BufferedRecord<T>> merged = 
recordMerger.deltaMerge(newBufferedRecord, oldBufferedRecord);
+      // NOTE: For merge mode based merging, it returns non-null.
+      //       For mergers / payloads based merging, it may return null.
+      HoodieRecord<T> reducedRecord = 
merged.map(recordContext::constructHoodieRecord).orElse(previous);
+      boolean choosePrevious = merged.isEmpty();
+      HoodieKey reducedKey = choosePrevious ? previous.getKey() : 
next.getKey();
+      HoodieOperation operation = choosePrevious ? previous.getOperation() : 
next.getOperation();
+      return reducedRecord.newInstance(reducedKey, operation);

Review Comment:
   the location info got lost, some write handles like append handle needs this 
into to decide whether the record is an update, we can use the location of the 
previous record. If it is the partial update mode, new record always generated, 
we will get a record with not location setup there.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -411,30 +416,40 @@ private static <R> Option<HoodieRecord<R>> 
mergeIncomingWithExistingRecordWithEx
       HoodieRecord<R> incoming,
       HoodieRecord<R> existing,
       Schema writeSchema,
-      Schema existingSchema,
       Schema writeSchemaWithMetaFields,
       HoodieWriteConfig config,
-      HoodieRecordMerger recordMerger,
-      BaseKeyGenerator keyGenerator) throws IOException {
-    Option<Pair<HoodieRecord, Schema>> mergeResult = 
recordMerger.merge(existing, existingSchema,
-        incoming, writeSchemaWithMetaFields, config.getProps());
-    if (!mergeResult.isPresent()) {
+      BufferedRecordMerger<R> recordMerger,
+      BaseKeyGenerator keyGenerator,
+      RecordContext<R> incomingRecordContext,
+      RecordContext<R> existingRecordContext,
+      String[] orderingFieldNames) throws IOException {
+    BufferedRecord<R> incomingBufferedRecord = 
BufferedRecord.forRecordWithContext(incoming, writeSchemaWithMetaFields, 
incomingRecordContext, config.getProps(), orderingFieldNames);
+    BufferedRecord<R> existingBufferedRecord = 
BufferedRecord.forRecordWithContext(existing, writeSchemaWithMetaFields, 
existingRecordContext, config.getProps(), orderingFieldNames);
+    MergeResult<R> mergeResult = 
recordMerger.finalMerge(existingBufferedRecord, incomingBufferedRecord);
+    if (mergeResult.isDelete()) {
       //the record was deleted
       return Option.empty();
     }
-    HoodieRecord<R> result = mergeResult.get().getLeft();
+    if (mergeResult.getMergedRecord() == null) {
+      // SENTINEL case: the record did not match and merge case and should not 
be modified
+      return Option.of((HoodieRecord<R>) new 
HoodieAvroIndexedRecord(HoodieRecord.SENTINEL));
+    }
+
+    BufferedRecord<R> resultingBufferedRecord = 
BufferedRecord.forRecordWithContext(mergeResult.getMergedRecord(), 
writeSchemaWithMetaFields,
+        existingRecordContext, orderingFieldNames, existing.getRecordKey(), 
false);
+    HoodieRecord<R> result = 
existingRecordContext.constructHoodieRecord(resultingBufferedRecord);
+
     if (result.getData().equals(HoodieRecord.SENTINEL)) {
       //the record did not match and merge case and should not be modified
       return Option.of(result);
     }
 
     //record is inserted or updated
-    String partitionPath = keyGenerator.getPartitionPath((GenericRecord) 
result.getData());
+    String partitionPath = 
keyGenerator.getPartitionPath(existingRecordContext.convertToAvroRecord(mergeResult.getMergedRecord(),
 writeSchemaWithMetaFields));

Review Comment:
   This is too costly to just fetch a partition path, can we just use 
`incoming.getPartitionPath`? The two should belong to the same partition.



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java:
##########
@@ -226,7 +233,18 @@ private void initRecordConverter() {
   }
 
   private void initMergeClass() {
-    recordMerger = 
HoodieRecordUtils.mergerToPreCombineMode(writeClient.getConfig().getRecordMerger());
+    readerContext = 
writeClient.getEngineContext().<RowData>getReaderContextFactory(metaClient).getContext();
+    orderingFieldNames = getOrderingFieldNames(readerContext.getMergeMode(), 
writeClient.getConfig().getProps(), metaClient);
+    recordMerger = BufferedRecordMergerFactory.create(
+        readerContext,
+        writeClient.getConfig().getRecordMergeMode(),
+        false,
+        Option.ofNullable(writeClient.getConfig().getRecordMerger()),
+        orderingFieldNames,
+        Option.ofNullable(writeClient.getConfig().getPayloadClass()),
+        new SerializableSchema(writeClient.getConfig().getSchema()).get(),

Review Comment:
   Can we write some utiity method in `HoodieAvroUtils` to parse string as avro 
schema instead of using the `SerializableSchema` constructor.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -444,41 +459,46 @@ private static <R> Option<HoodieRecord<R>> 
mergeIncomingWithExistingRecord(
       HoodieRecord<R> incoming,
       HoodieRecord<R> existing,
       Schema writeSchema,
+      Schema writeSchemaWithMetaFields,
       HoodieWriteConfig config,
-      HoodieRecordMerger recordMerger,
-      Option<BaseKeyGenerator> expressionPayloadKeygen) throws IOException {
-    Schema existingSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getSchema()), 
config.allowOperationMetadataField());
-    Schema writeSchemaWithMetaFields = 
HoodieAvroUtils.addMetadataFields(writeSchema, 
config.allowOperationMetadataField());
+      BufferedRecordMerger<R> recordMerger,
+      Option<BaseKeyGenerator> expressionPayloadKeygen,
+      RecordContext<R> incomingRecordContext,
+      RecordContext<R> existingRecordContext,
+      String[] orderingFieldNames) throws IOException {
     if (expressionPayloadKeygen.isPresent()) {
       return mergeIncomingWithExistingRecordWithExpressionPayload(incoming, 
existing, writeSchema,
-          existingSchema, writeSchemaWithMetaFields, config, recordMerger, 
expressionPayloadKeygen.get());
+          writeSchemaWithMetaFields, config, recordMerger, 
expressionPayloadKeygen.get(), incomingRecordContext, existingRecordContext, 
orderingFieldNames);
     } else {
       // prepend the hoodie meta fields as the incoming record does not have 
them
       HoodieRecord incomingPrepended = incoming
           .prependMetaFields(writeSchema, writeSchemaWithMetaFields, new 
MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(incoming.getPartitionPath()),
 config.getProps());
-      // after prepend the meta fields, convert the record back to the 
original payload
-      HoodieRecord incomingWithMetaFields = incomingPrepended
-          .wrapIntoHoodieRecordPayloadWithParams(writeSchemaWithMetaFields, 
config.getProps(), Option.empty(), config.allowOperationMetadataField(), 
Option.empty(), false, Option.empty());
-      Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger
-          .merge(existing, existingSchema, incomingWithMetaFields, 
writeSchemaWithMetaFields, config.getProps());
-      if (mergeResult.isPresent()) {
-        // the merged record needs to be converted back to the original payload
-        HoodieRecord<R> merged = 
mergeResult.get().getLeft().wrapIntoHoodieRecordPayloadWithParams(
-            writeSchemaWithMetaFields, config.getProps(), Option.empty(),
-            config.allowOperationMetadataField(), Option.empty(), false, 
Option.of(writeSchema));
-        return Option.of(merged);
-      } else {
+      BufferedRecord<R> incomingBufferedRecord = 
BufferedRecord.forRecordWithContext(incomingPrepended, 
writeSchemaWithMetaFields, incomingRecordContext, config.getProps(), 
orderingFieldNames);
+      BufferedRecord<R> existingBufferedRecord = 
BufferedRecord.forRecordWithContext(existing, writeSchemaWithMetaFields, 
existingRecordContext, config.getProps(), orderingFieldNames);
+      MergeResult<R> mergeResult = 
recordMerger.finalMerge(existingBufferedRecord, incomingBufferedRecord);
+
+      if (mergeResult.isDelete()) {
+        // the record was deleted
         return Option.empty();
       }
+      BufferedRecord<R> resultingBufferedRecord = 
BufferedRecord.forRecordWithContext(mergeResult.getMergedRecord(), 
writeSchemaWithMetaFields, existingRecordContext,
+          orderingFieldNames, existing.getRecordKey(), 
mergeResult.getMergedRecord() == null);
+      HoodieRecord<R> result = 
existingRecordContext.constructHoodieRecord(resultingBufferedRecord);
+      // the merged record needs to be converted back to the original payload
+      return 
Option.of(result.wrapIntoHoodieRecordPayloadWithParams(writeSchemaWithMetaFields,
 config.getProps(), Option.empty(),

Review Comment:
   this is also very costly.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java:
##########
@@ -85,9 +102,70 @@ public I combineOnCondition(
    * @return Collection of HoodieRecord already be deduplicated
    */
   public I deduplicateRecords(I records, HoodieTable<T, I, K, O> table, int 
parallelism) {
+    HoodieReaderContext<T> readerContext =
+        (HoodieReaderContext<T>) 
table.getContext().<T>getReaderContextFactoryForWrite(table.getMetaClient(), 
table.getConfig().getRecordMerger().getRecordType(), 
table.getConfig().getProps())
+            .getContext();
+    HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
+    readerContext.initRecordMerger(table.getConfig().getProps());
+    List<String> orderingFieldNames = 
HoodieRecordUtils.getOrderingFieldNames(readerContext.getMergeMode(), 
table.getConfig().getProps(), table.getMetaClient());
     HoodieRecordMerger recordMerger = 
HoodieRecordUtils.mergerToPreCombineMode(table.getConfig().getRecordMerger());
-    return deduplicateRecords(records, table.getIndex(), parallelism, 
table.getConfig().getSchema(), table.getConfig().getProps(), recordMerger);
+    RecordMergeMode recordMergeMode = 
HoodieTableConfig.inferCorrectMergingBehavior(null, 
table.getConfig().getPayloadClass(), null,
+        String.join(",", orderingFieldNames), 
tableConfig.getTableVersion()).getLeft();
+    Schema recordSchema;
+    if (StringUtils.nonEmpty(table.getConfig().getPartialUpdateSchema())) {
+      recordSchema = new 
Schema.Parser().parse(table.getConfig().getPartialUpdateSchema());

Review Comment:
   let's intern these schema with `AvroSchemaCache`.



##########
hudi-common/src/main/java/org/apache/hudi/common/engine/RecordContext.java:
##########
@@ -39,39 +41,54 @@
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.function.BiFunction;
 
 import static 
org.apache.hudi.common.model.HoodieRecord.HOODIE_IS_DELETED_FIELD;
 import static 
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
 
+/**
+ * Record context provides the APIs for record related operations. Record 
context is associated with
+ * a corresponding {@link HoodieReaderContext} and is used for getting field 
values from a record,
+ * transforming a record etc.
+ */
 public abstract class RecordContext<T> implements Serializable {
 
   private static final long serialVersionUID = 1L;
 
-  private SerializableBiFunction<T, Schema, String> recordKeyExtractor;
+  private final SerializableBiFunction<T, Schema, String> recordKeyExtractor;
   // for encoding and decoding schemas to the spillable map
   private final LocalAvroSchemaCache localAvroSchemaCache = 
LocalAvroSchemaCache.getInstance();
 
   protected JavaTypeConverter typeConverter;
-  protected String partitionPath;
+  protected String partitionPath = null;

Review Comment:
   Let's make the partition path non nullalbe.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -394,30 +407,36 @@ public MergeResult<T> 
mergeNonDeleteRecord(BufferedRecord<T> olderRecord, Buffer
    * based on {@code CUSTOM} merge mode and a given record payload class.
    */
   private static class CustomPayloadRecordMerger<T> extends 
BaseCustomMerger<T> {
-    private final List<String> orderingFieldNames;
-    private final String payloadClass;
+    private final String[] orderingFieldNames;
+    private final String basePayloadClass;
+    private final String incomingPayloadClass;
 
     public CustomPayloadRecordMerger(
         RecordContext<T> recordContext,
         Option<HoodieRecordMerger> recordMerger,
         List<String> orderingFieldNames,
-        String payloadClass,
+        String basePayloadClass,
+        String incomingPayloadClass,
         Schema readerSchema,
         TypedProperties props) {
       super(recordContext, recordMerger, readerSchema, props);
-      this.orderingFieldNames = orderingFieldNames;
-      this.payloadClass = payloadClass;
+      this.orderingFieldNames = orderingFieldNames.toArray(new String[0]);
+      this.basePayloadClass = basePayloadClass;
+      this.incomingPayloadClass = incomingPayloadClass;
     }
 
     @Override
     public Option<BufferedRecord<T>> 
deltaMergeNonDeleteRecord(BufferedRecord<T> newRecord, BufferedRecord<T> 
existingRecord) throws IOException {
-      Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt = 
getMergedRecord(existingRecord, newRecord);
-      if (combinedRecordAndSchemaOpt.isPresent()) {
-        T combinedRecordData = recordContext.convertAvroRecord((IndexedRecord) 
combinedRecordAndSchemaOpt.get().getLeft().getData());
+      Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt = 
getMergedRecord(existingRecord, newRecord, false);
+      if (combinedRecordAndSchemaOpt.map(combinedRecordAndSchema -> 
combinedRecordAndSchema.getRight() != null).orElse(false)) {

Review Comment:
   why the schema can be null, can we fix the merger to return it correctly?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -427,31 +446,35 @@ public Option<BufferedRecord<T>> 
deltaMergeNonDeleteRecord(BufferedRecord<T> new
 
     @Override
     public MergeResult<T> mergeNonDeleteRecord(BufferedRecord<T> olderRecord, 
BufferedRecord<T> newerRecord) throws IOException {
-      Option<Pair<HoodieRecord, Schema>> mergedRecord =
-          getMergedRecord(olderRecord, newerRecord);
-      if (mergedRecord.isPresent()
-          && 
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
+      Option<Pair<HoodieRecord, Schema>> mergedRecordAndSchema = 
getMergedRecord(olderRecord, newerRecord, true);
+      if (mergedRecordAndSchema.isEmpty()) {
+        return new MergeResult<>(true, null);
+      }
+      HoodieRecord mergedRecord = mergedRecordAndSchema.get().getLeft();
+      Schema mergeResultSchema = mergedRecordAndSchema.get().getRight();
+      // Special handling for SENTINEL record in Expression Payload
+      if (mergedRecord.getData() == HoodieRecord.SENTINEL) {

Review Comment:
   +1



##########
hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java:
##########
@@ -79,15 +90,27 @@ public String getMetaFieldValue(IndexedRecord record, int 
pos) {
   }
 
   @Override
-  public HoodieRecord<IndexedRecord> 
constructHoodieRecord(BufferedRecord<IndexedRecord> bufferedRecord) {
+  public HoodieRecord constructHoodieRecord(BufferedRecord<IndexedRecord> 
bufferedRecord) {
+    // HoodieKey is not required so do not generate it if partitionPath is null
+    HoodieKey hoodieKey = partitionPath == null ? null : new 
HoodieKey(bufferedRecord.getRecordKey(), partitionPath);

Review Comment:
   Can we always set up the partition path for recordContext? the null key for 
hoodie record will throw NPE for `HoodieRecord.getRecordKey`



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -394,30 +407,36 @@ public MergeResult<T> 
mergeNonDeleteRecord(BufferedRecord<T> olderRecord, Buffer
    * based on {@code CUSTOM} merge mode and a given record payload class.
    */
   private static class CustomPayloadRecordMerger<T> extends 
BaseCustomMerger<T> {
-    private final List<String> orderingFieldNames;
-    private final String payloadClass;
+    private final String[] orderingFieldNames;
+    private final String basePayloadClass;
+    private final String incomingPayloadClass;

Review Comment:
   curious why the incoming and base has two different payload classes? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to