danny0405 commented on code in PR #13600:
URL: https://github.com/apache/hudi/pull/13600#discussion_r2252821292
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -411,30 +416,40 @@ private static <R> Option<HoodieRecord<R>>
mergeIncomingWithExistingRecordWithEx
HoodieRecord<R> incoming,
HoodieRecord<R> existing,
Schema writeSchema,
- Schema existingSchema,
Schema writeSchemaWithMetaFields,
HoodieWriteConfig config,
- HoodieRecordMerger recordMerger,
- BaseKeyGenerator keyGenerator) throws IOException {
- Option<Pair<HoodieRecord, Schema>> mergeResult =
recordMerger.merge(existing, existingSchema,
- incoming, writeSchemaWithMetaFields, config.getProps());
- if (!mergeResult.isPresent()) {
+ BufferedRecordMerger<R> recordMerger,
+ BaseKeyGenerator keyGenerator,
+ RecordContext<R> incomingRecordContext,
+ RecordContext<R> existingRecordContext,
+ String[] orderingFieldNames) throws IOException {
+ BufferedRecord<R> incomingBufferedRecord =
BufferedRecord.forRecordWithContext(incoming, writeSchemaWithMetaFields,
incomingRecordContext, config.getProps(), orderingFieldNames);
+ BufferedRecord<R> existingBufferedRecord =
BufferedRecord.forRecordWithContext(existing, writeSchemaWithMetaFields,
existingRecordContext, config.getProps(), orderingFieldNames);
+ MergeResult<R> mergeResult =
recordMerger.finalMerge(existingBufferedRecord, incomingBufferedRecord);
+ if (mergeResult.isDelete()) {
//the record was deleted
return Option.empty();
}
- HoodieRecord<R> result = mergeResult.get().getLeft();
+ if (mergeResult.getMergedRecord() == null) {
+ // SENTINEL case: the record did not match and merge case and should not
be modified
+ return Option.of((HoodieRecord<R>) new
HoodieAvroIndexedRecord(HoodieRecord.SENTINEL));
+ }
+
+ BufferedRecord<R> resultingBufferedRecord =
BufferedRecord.forRecordWithContext(mergeResult.getMergedRecord(),
writeSchemaWithMetaFields,
+ existingRecordContext, orderingFieldNames, existing.getRecordKey(),
false);
+ HoodieRecord<R> result =
existingRecordContext.constructHoodieRecord(resultingBufferedRecord);
+
if (result.getData().equals(HoodieRecord.SENTINEL)) {
//the record did not match and merge case and should not be modified
return Option.of(result);
}
//record is inserted or updated
- String partitionPath = keyGenerator.getPartitionPath((GenericRecord)
result.getData());
+ String partitionPath =
keyGenerator.getPartitionPath(existingRecordContext.convertToAvroRecord(mergeResult.getMergedRecord(),
writeSchemaWithMetaFields));
Review Comment:
> two are not guaranteed to have the same partition path
Okay, how can the two be merged if they do not belong to the same partition.
> whether the output data is the same instance as the existing record or
new record
Let's do that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]