the-other-tim-brown commented on code in PR #13600:
URL: https://github.com/apache/hudi/pull/13600#discussion_r2252833712


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -411,30 +416,40 @@ private static <R> Option<HoodieRecord<R>> 
mergeIncomingWithExistingRecordWithEx
       HoodieRecord<R> incoming,
       HoodieRecord<R> existing,
       Schema writeSchema,
-      Schema existingSchema,
       Schema writeSchemaWithMetaFields,
       HoodieWriteConfig config,
-      HoodieRecordMerger recordMerger,
-      BaseKeyGenerator keyGenerator) throws IOException {
-    Option<Pair<HoodieRecord, Schema>> mergeResult = 
recordMerger.merge(existing, existingSchema,
-        incoming, writeSchemaWithMetaFields, config.getProps());
-    if (!mergeResult.isPresent()) {
+      BufferedRecordMerger<R> recordMerger,
+      BaseKeyGenerator keyGenerator,
+      RecordContext<R> incomingRecordContext,
+      RecordContext<R> existingRecordContext,
+      String[] orderingFieldNames) throws IOException {
+    BufferedRecord<R> incomingBufferedRecord = 
BufferedRecord.forRecordWithContext(incoming, writeSchemaWithMetaFields, 
incomingRecordContext, config.getProps(), orderingFieldNames);
+    BufferedRecord<R> existingBufferedRecord = 
BufferedRecord.forRecordWithContext(existing, writeSchemaWithMetaFields, 
existingRecordContext, config.getProps(), orderingFieldNames);
+    MergeResult<R> mergeResult = 
recordMerger.finalMerge(existingBufferedRecord, incomingBufferedRecord);
+    if (mergeResult.isDelete()) {
       //the record was deleted
       return Option.empty();
     }
-    HoodieRecord<R> result = mergeResult.get().getLeft();
+    if (mergeResult.getMergedRecord() == null) {
+      // SENTINEL case: the record did not match and merge case and should not 
be modified
+      return Option.of((HoodieRecord<R>) new 
HoodieAvroIndexedRecord(HoodieRecord.SENTINEL));
+    }
+
+    BufferedRecord<R> resultingBufferedRecord = 
BufferedRecord.forRecordWithContext(mergeResult.getMergedRecord(), 
writeSchemaWithMetaFields,
+        existingRecordContext, orderingFieldNames, existing.getRecordKey(), 
false);
+    HoodieRecord<R> result = 
existingRecordContext.constructHoodieRecord(resultingBufferedRecord);
+
     if (result.getData().equals(HoodieRecord.SENTINEL)) {
       //the record did not match and merge case and should not be modified
       return Option.of(result);
     }
 
     //record is inserted or updated
-    String partitionPath = keyGenerator.getPartitionPath((GenericRecord) 
result.getData());
+    String partitionPath = 
keyGenerator.getPartitionPath(existingRecordContext.convertToAvroRecord(mergeResult.getMergedRecord(),
 writeSchemaWithMetaFields));

Review Comment:
   > Okay, how can the two be merged if they do not belong to the same 
partition.
   This is what it means for the index to be Global. The records are able to 
move between partitions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to