the-other-tim-brown commented on code in PR #13600:
URL: https://github.com/apache/hudi/pull/13600#discussion_r2252833712
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -411,30 +416,40 @@ private static <R> Option<HoodieRecord<R>>
mergeIncomingWithExistingRecordWithEx
HoodieRecord<R> incoming,
HoodieRecord<R> existing,
Schema writeSchema,
- Schema existingSchema,
Schema writeSchemaWithMetaFields,
HoodieWriteConfig config,
- HoodieRecordMerger recordMerger,
- BaseKeyGenerator keyGenerator) throws IOException {
- Option<Pair<HoodieRecord, Schema>> mergeResult =
recordMerger.merge(existing, existingSchema,
- incoming, writeSchemaWithMetaFields, config.getProps());
- if (!mergeResult.isPresent()) {
+ BufferedRecordMerger<R> recordMerger,
+ BaseKeyGenerator keyGenerator,
+ RecordContext<R> incomingRecordContext,
+ RecordContext<R> existingRecordContext,
+ String[] orderingFieldNames) throws IOException {
+ BufferedRecord<R> incomingBufferedRecord =
BufferedRecord.forRecordWithContext(incoming, writeSchemaWithMetaFields,
incomingRecordContext, config.getProps(), orderingFieldNames);
+ BufferedRecord<R> existingBufferedRecord =
BufferedRecord.forRecordWithContext(existing, writeSchemaWithMetaFields,
existingRecordContext, config.getProps(), orderingFieldNames);
+ MergeResult<R> mergeResult =
recordMerger.finalMerge(existingBufferedRecord, incomingBufferedRecord);
+ if (mergeResult.isDelete()) {
//the record was deleted
return Option.empty();
}
- HoodieRecord<R> result = mergeResult.get().getLeft();
+ if (mergeResult.getMergedRecord() == null) {
+ // SENTINEL case: the record did not match and merge case and should not
be modified
+ return Option.of((HoodieRecord<R>) new
HoodieAvroIndexedRecord(HoodieRecord.SENTINEL));
+ }
+
+ BufferedRecord<R> resultingBufferedRecord =
BufferedRecord.forRecordWithContext(mergeResult.getMergedRecord(),
writeSchemaWithMetaFields,
+ existingRecordContext, orderingFieldNames, existing.getRecordKey(),
false);
+ HoodieRecord<R> result =
existingRecordContext.constructHoodieRecord(resultingBufferedRecord);
+
if (result.getData().equals(HoodieRecord.SENTINEL)) {
//the record did not match and merge case and should not be modified
return Option.of(result);
}
//record is inserted or updated
- String partitionPath = keyGenerator.getPartitionPath((GenericRecord)
result.getData());
+ String partitionPath =
keyGenerator.getPartitionPath(existingRecordContext.convertToAvroRecord(mergeResult.getMergedRecord(),
writeSchemaWithMetaFields));
Review Comment:
> Okay, how can the two be merged if they do not belong to the same
partition.
This is what it means for the index to be Global. The records are able to
move between partitions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]