danny0405 commented on code in PR #10368:
URL: https://github.com/apache/hudi/pull/10368#discussion_r1432195897


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -251,25 +310,31 @@ private static <R> Option<HoodieRecord<R>> 
mergeIncomingWithExistingRecord(
       HoodieRecord<R> existing,
       Schema writeSchema,
       HoodieWriteConfig config,
-      HoodieRecordMerger recordMerger) throws IOException {
+      HoodieRecordMerger recordMerger,
+      Option<Pair<BaseKeyGenerator, HoodieWriteConfig>> 
keyGeneratorWriteConfigOpt) throws IOException {
     Schema existingSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getSchema()), 
config.allowOperationMetadataField());
     Schema writeSchemaWithMetaFields = 
HoodieAvroUtils.addMetadataFields(writeSchema, 
config.allowOperationMetadataField());
-    // prepend the hoodie meta fields as the incoming record does not have them
-    HoodieRecord incomingPrepended = incoming
-        .prependMetaFields(writeSchema, writeSchemaWithMetaFields, new 
MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(incoming.getPartitionPath()),
 config.getProps());
-    // after prepend the meta fields, convert the record back to the original 
payload
-    HoodieRecord incomingWithMetaFields = incomingPrepended
-        .wrapIntoHoodieRecordPayloadWithParams(writeSchema, config.getProps(), 
Option.empty(), config.allowOperationMetadataField(), Option.empty(), false, 
Option.empty());
-    Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger
-        .merge(existing, existingSchema, incomingWithMetaFields, 
writeSchemaWithMetaFields, config.getProps());
-    if (mergeResult.isPresent()) {
-      // the merged record needs to be converted back to the original payload
-      HoodieRecord<R> merged = 
mergeResult.get().getLeft().wrapIntoHoodieRecordPayloadWithParams(
-          writeSchemaWithMetaFields, config.getProps(), Option.empty(),
-          config.allowOperationMetadataField(), Option.empty(), false, 
Option.of(writeSchema));
-      return Option.of(merged);
+    if (keyGeneratorWriteConfigOpt.isPresent()) {
+      return mergeIncomingWithExistingRecordWithExpressionPayload(incoming, 
existing, writeSchema,
+          existingSchema, writeSchemaWithMetaFields, 
keyGeneratorWriteConfigOpt.get().getRight(), recordMerger, 
keyGeneratorWriteConfigOpt.get().getKey());
     } else {
-      return Option.empty();
+      // prepend the hoodie meta fields as the incoming record does not have 
them
+      HoodieRecord incomingPrepended = incoming
+          .prependMetaFields(writeSchema, writeSchemaWithMetaFields, new 
MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(incoming.getPartitionPath()),
 config.getProps());
+      // after prepend the meta fields, convert the record back to the 
original payload
+      HoodieRecord incomingWithMetaFields = incomingPrepended

Review Comment:
   Maybe not related, but why we always prepend the incoming records with 
metadata fields?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to