linliu-code commented on code in PR #13115:
URL: https://github.com/apache/hudi/pull/13115#discussion_r2035927580
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -275,43 +274,47 @@ protected Option<Pair<Option<T>, Map<String, Object>>>
doProcessNextDataRecord(T
} else {
switch (recordMergeMode) {
case COMMIT_TIME_ORDERING:
- return Option.empty();
+ return Option.of(Pair.of(Option.ofNullable(record), metadata));
case EVENT_TIME_ORDERING:
- Comparable existingOrderingValue = readerContext.getOrderingValue(
- existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight(),
- readerSchema, orderingFieldName);
- if
(isDeleteRecordWithNaturalOrder(existingRecordMetadataPair.getLeft(),
existingOrderingValue)) {
- return Option.empty();
- }
- Comparable incomingOrderingValue = readerContext.getOrderingValue(
- Option.of(record), metadata, readerSchema, orderingFieldName);
- if (incomingOrderingValue.compareTo(existingOrderingValue) > 0) {
+ if (shouldKeepNewerRecord(existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight(), Option.ofNullable(record), metadata)) {
return Option.of(Pair.of(Option.of(record), metadata));
}
return Option.empty();
case CUSTOM:
default:
// Merge and store the combined record
- // Note that the incoming `record` is from an older commit, so it
should be put as
- // the `older` in the merge API
if (payloadClass.isPresent()) {
+ if (existingRecordMetadataPair.getLeft().isEmpty()
+ &&
shouldKeepNewerRecord(existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight(), Option.ofNullable(record), metadata)) {
+ // IMPORTANT:
+ // this is needed when the fallback HoodieAvroRecordMerger got
used, the merger would
+ // return Option.empty when the old payload data is empty(a
delete) and ignores its ordering value directly.
+ return Option.of(Pair.of(Option.of(record), metadata));
+ }
Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt =
- getMergedRecord(Option.of(record), metadata,
existingRecordMetadataPair.getLeft(), existingRecordMetadataPair.getRight());
+ getMergedRecord(existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight(), Option.of(record), metadata);
if (combinedRecordAndSchemaOpt.isPresent()) {
T combinedRecordData =
readerContext.convertAvroRecord((IndexedRecord)
combinedRecordAndSchemaOpt.get().getLeft().getData());
// If pre-combine does not return existing record, update it
- if (combinedRecordData !=
existingRecordMetadataPair.getLeft().get()) {
+ if (combinedRecordData !=
existingRecordMetadataPair.getLeft().orElse(null)) {
return
Option.of(Pair.of(Option.ofNullable(combinedRecordData), metadata));
}
}
return Option.empty();
} else {
+ if (existingRecordMetadataPair.getLeft().isEmpty()
+ &&
shouldKeepNewerRecord(existingRecordMetadataPair.getLeft(),
existingRecordMetadataPair.getRight(), Option.ofNullable(record), metadata)) {
+ // IMPORTANT:
+ // this is needed when the fallback HoodieAvroRecordMerger got
used, the merger would
+ // return Option.empty when the old payload data is empty(a
delete) and ignores its ordering value directly.
+ return Option.of(Pair.of(Option.of(record), metadata));
+ }
Review Comment:
DRP with L287-L293
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]