danny0405 commented on code in PR #13600:
URL: https://github.com/apache/hudi/pull/13600#discussion_r2252821292


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -411,30 +416,40 @@ private static <R> Option<HoodieRecord<R>> 
mergeIncomingWithExistingRecordWithEx
       HoodieRecord<R> incoming,
       HoodieRecord<R> existing,
       Schema writeSchema,
-      Schema existingSchema,
       Schema writeSchemaWithMetaFields,
       HoodieWriteConfig config,
-      HoodieRecordMerger recordMerger,
-      BaseKeyGenerator keyGenerator) throws IOException {
-    Option<Pair<HoodieRecord, Schema>> mergeResult = 
recordMerger.merge(existing, existingSchema,
-        incoming, writeSchemaWithMetaFields, config.getProps());
-    if (!mergeResult.isPresent()) {
+      BufferedRecordMerger<R> recordMerger,
+      BaseKeyGenerator keyGenerator,
+      RecordContext<R> incomingRecordContext,
+      RecordContext<R> existingRecordContext,
+      String[] orderingFieldNames) throws IOException {
+    BufferedRecord<R> incomingBufferedRecord = 
BufferedRecord.forRecordWithContext(incoming, writeSchemaWithMetaFields, 
incomingRecordContext, config.getProps(), orderingFieldNames);
+    BufferedRecord<R> existingBufferedRecord = 
BufferedRecord.forRecordWithContext(existing, writeSchemaWithMetaFields, 
existingRecordContext, config.getProps(), orderingFieldNames);
+    MergeResult<R> mergeResult = 
recordMerger.finalMerge(existingBufferedRecord, incomingBufferedRecord);
+    if (mergeResult.isDelete()) {
       //the record was deleted
       return Option.empty();
     }
-    HoodieRecord<R> result = mergeResult.get().getLeft();
+    if (mergeResult.getMergedRecord() == null) {
+      // SENTINEL case: the record did not match and merge case and should not 
be modified
+      return Option.of((HoodieRecord<R>) new 
HoodieAvroIndexedRecord(HoodieRecord.SENTINEL));
+    }
+
+    BufferedRecord<R> resultingBufferedRecord = 
BufferedRecord.forRecordWithContext(mergeResult.getMergedRecord(), 
writeSchemaWithMetaFields,
+        existingRecordContext, orderingFieldNames, existing.getRecordKey(), 
false);
+    HoodieRecord<R> result = 
existingRecordContext.constructHoodieRecord(resultingBufferedRecord);
+
     if (result.getData().equals(HoodieRecord.SENTINEL)) {
       //the record did not match and merge case and should not be modified
       return Option.of(result);
     }
 
     //record is inserted or updated
-    String partitionPath = keyGenerator.getPartitionPath((GenericRecord) 
result.getData());
+    String partitionPath = 
keyGenerator.getPartitionPath(existingRecordContext.convertToAvroRecord(mergeResult.getMergedRecord(),
 writeSchemaWithMetaFields));

Review Comment:
   >  two are not guaranteed to have the same partition path
   
   Okay, how can the two be merged if they do not belong to the same partition.
   
   >  whether the output data is the same instance as the existing record or 
new record
   
   Let's do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to