the-other-tim-brown commented on code in PR #13742:
URL: https://github.com/apache/hudi/pull/13742#discussion_r2302521697
##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/BufferedRecordMergerFactory.java:
##########
@@ -567,42 +553,36 @@ public Option<BufferedRecord<T>>
deltaMerge(BufferedRecord<T> newRecord, Buffere
if (existingRecord == null) {
return Option.of(newRecord);
}
- if (existingRecord.isDelete() || newRecord.isDelete()) {
- if (shouldKeepNewerRecord(existingRecord, newRecord)) {
- // IMPORTANT:
- // this is needed when the fallback HoodieAvroRecordMerger got used,
the merger would
- // return Option.empty when the old payload data is empty(a delete)
and ignores its ordering value directly.
- return Option.of(newRecord);
- } else {
- return Option.empty();
- }
- }
- return deltaMergeNonDeleteRecord(newRecord, existingRecord);
+ return deltaMergeRecords(newRecord, existingRecord);
}
- public abstract Option<BufferedRecord<T>>
deltaMergeNonDeleteRecord(BufferedRecord<T> newRecord, BufferedRecord<T>
existingRecord) throws IOException;
+ public abstract Option<BufferedRecord<T>>
deltaMergeRecords(BufferedRecord<T> newRecord, BufferedRecord<T>
existingRecord) throws IOException;
@Override
public Option<DeleteRecord> deltaMerge(DeleteRecord deleteRecord,
BufferedRecord<T> existingRecord) {
- return deltaMergeDeleteRecord(deleteRecord, existingRecord);
+ BufferedRecord<T> deleteBufferedRecord =
BufferedRecords.fromDeleteRecord(deleteRecord, recordContext);
+ try {
+ Option<BufferedRecord<T>> merged = deltaMerge(deleteBufferedRecord,
existingRecord);
+ // If the delete record is chosen, return an option with the delete
record, otherwise return empty.
+ return merged.isPresent() ? Option.of(deleteRecord) : Option.empty();
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to process delete record", e);
+ }
}
@Override
public BufferedRecord<T> finalMerge(BufferedRecord<T> olderRecord,
BufferedRecord<T> newerRecord) throws IOException {
- if (olderRecord.isDelete() || newerRecord.isDelete()) {
- if (shouldKeepNewerRecord(olderRecord, newerRecord)) {
- // IMPORTANT:
- // this is needed when the fallback HoodieAvroRecordMerger got used,
the merger would
- // return Option.empty when the new payload data is empty(a delete)
and ignores its ordering value directly.
- return newerRecord;
- } else {
- return olderRecord;
- }
+ if (olderRecord == null) {
+ return newerRecord;
+ }
+ // handle special case for deletes that are sent to older partitions in
global-index, this delete takes precedence regardless of the previous value
+ if (newerRecord.getHoodieOperation() == HoodieOperation.UPDATE_BEFORE) {
Review Comment:
Right now the global index with Merge-Into tests will fail without this. It
will require some more investigation into why that is the case
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]