yihua commented on code in PR #12597:
URL: https://github.com/apache/hudi/pull/12597#discussion_r1934850277
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/DefaultSparkRecordMerger.java:
##########
@@ -44,40 +47,46 @@ public String getMergingStrategy() {
}
@Override
- public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws
IOException {
+ public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older,
+ Schema oldSchema,
+ HoodieRecord newer,
+ Schema newSchema,
+ TypedProperties props)
throws IOException {
ValidationUtils.checkArgument(older.getRecordType() ==
HoodieRecordType.SPARK);
ValidationUtils.checkArgument(newer.getRecordType() ==
HoodieRecordType.SPARK);
- if (newer instanceof HoodieSparkRecord) {
- HoodieSparkRecord newSparkRecord = (HoodieSparkRecord) newer;
- if (newSparkRecord.isDelete(newSchema, props)) {
- // Delete record
- return Option.empty();
- }
- } else {
- if (newer.getData() == null) {
- // Delete record
- return Option.empty();
- }
+ if (newer instanceof HoodieEmptyRecord) {
+ return Option.empty();
Review Comment:
What is the expectation of the returned record from `#merge` API? If it's
delete record, should we still keep the record with the ordering value, i.e.,
`HoodieEmptyRecord`? Otherwise, how does the following merge operation know
the ordering value of this delete record?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/DefaultSparkRecordMerger.java:
##########
@@ -44,40 +47,46 @@ public String getMergingStrategy() {
}
@Override
- public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws
IOException {
+ public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older,
+ Schema oldSchema,
+ HoodieRecord newer,
+ Schema newSchema,
+ TypedProperties props)
throws IOException {
ValidationUtils.checkArgument(older.getRecordType() ==
HoodieRecordType.SPARK);
ValidationUtils.checkArgument(newer.getRecordType() ==
HoodieRecordType.SPARK);
- if (newer instanceof HoodieSparkRecord) {
- HoodieSparkRecord newSparkRecord = (HoodieSparkRecord) newer;
- if (newSparkRecord.isDelete(newSchema, props)) {
- // Delete record
- return Option.empty();
- }
- } else {
- if (newer.getData() == null) {
- // Delete record
- return Option.empty();
- }
+ if (newer instanceof HoodieEmptyRecord) {
+ return Option.empty();
+ }
+ HoodieSparkRecord newRecord = (HoodieSparkRecord) newer;
+ if (older instanceof HoodieEmptyRecord) {
+ return newRecord.isDelete(newSchema, props)
+ ? Option.empty() : Option.of(Pair.of(newer, newSchema));
}
+ HoodieSparkRecord oldRecord = (HoodieSparkRecord) older;
+ Comparable newOrderingVal = newRecord.getOrderingValue(newSchema, props);
+ Comparable oldOrderingVal = oldRecord.getOrderingValue(oldSchema, props);
- if (older instanceof HoodieSparkRecord) {
- HoodieSparkRecord oldSparkRecord = (HoodieSparkRecord) older;
- if (oldSparkRecord.isDelete(oldSchema, props)) {
- // use natural order for delete record
- return Option.of(Pair.of(newer, newSchema));
- }
- } else {
- if (older.getData() == null) {
- // use natural order for delete record
- return Option.of(Pair.of(newer, newSchema));
- }
+ // The same logic as fg reader.
+ // CASE 1: New record is a delete record with natural order.
+ if (newOrderingVal.equals(DEFAULT_ORDERING_VALUE)
+ && newRecord.isDelete(newSchema, props)) {
+ return Option.empty();
}
- if (older.getOrderingValue(oldSchema,
props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) {
- return Option.of(Pair.of(older, oldSchema));
- } else {
- return Option.of(Pair.of(newer, newSchema));
+ // When old record has valid ordering value, and its value
+ // is higher than that of new record,
+ // Case 2: old record is a delete record, return empty.
+ // Case 3: old record is not a delete record, return old record.
+ if (!oldOrderingVal.equals(DEFAULT_ORDERING_VALUE)
+ && oldOrderingVal.compareTo(newOrderingVal) > 0) {
+ return oldRecord.isDelete(oldSchema, props)
+ ? Option.empty() : Option.of(Pair.of(older, oldSchema));
}
+ // Otherwise, the return value is determined by new record.
+ // Case 4: new record is a delete record, return empty.
+ // Case 5: new record is not a delete record, return new.
+ return newRecord.isDelete(newSchema, props)
+ ? Option.empty() : Option.of(Pair.of(newer, newSchema));
Review Comment:
Seems complex here. Should we just return the record itself based on the
ordering value without checking deletes, and let the caller of `#merge` drop
the record instead?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]