yihua commented on code in PR #9593:
URL: https://github.com/apache/hudi/pull/9593#discussion_r1316379275
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java:
##########
@@ -43,8 +43,11 @@ public String getMergingStrategy() {
}
@Override
- public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws
IOException {
- return combineAndGetUpdateValue(older, newer, newSchema, props)
+ public Option<Pair<HoodieRecord, Schema>> merge(Option<HoodieRecord> older,
Schema oldSchema, Option<HoodieRecord> newer, Schema newSchema, TypedProperties
props) throws IOException {
+ if (!older.isPresent() || !newer.isPresent()) {
+ return Option.empty();
+ }
+ return combineAndGetUpdateValue(older.get(), newer.get(), newSchema, props)
Review Comment:
Correct me if I'm wrong, here are the four cases we need to handle:
(1) older: `absent`, newer: `absent`
-> return `absent`
(2) older: `exist`, newer: `empty`
-> follow delete logic, need to call `merge` API (e.g., for event-time-based
merging, if ordering value of the `newer` is smaller, the `older` should be
returned; for overwrite with latest, the `absent` should be returned)
(3) older: `exist`, newer: `exist`
-> follow update case, need to call `merge` API for actual merging
(4) older: `absent`, newer: `exist`
-> follow insert logic, i.e., `getInsertValue` should be called
After digging the code more, the other thing to pay attention to is, `newer`
can be `Option.empty()` or `Option.of(HoodieEmptyRecord<T>)`, or
`Option.of(HoodieRecord<EmptyHoodieRecordPayload>)`, to represent deletes
(these all represent `absent` in the logic above). So this should be taken
care of in different `HoodieRecord` implementation.
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java:
##########
@@ -43,8 +43,11 @@ public String getMergingStrategy() {
}
@Override
- public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws
IOException {
- return combineAndGetUpdateValue(older, newer, newSchema, props)
+ public Option<Pair<HoodieRecord, Schema>> merge(Option<HoodieRecord> older,
Schema oldSchema, Option<HoodieRecord> newer, Schema newSchema, TypedProperties
props) throws IOException {
+ if (!older.isPresent() || !newer.isPresent()) {
Review Comment:
Should this be `!older.isPresent() && !newer.isPresent()`?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java:
##########
@@ -346,7 +346,8 @@ public void write(HoodieRecord<T> oldRecord) {
// writing the first record. So make a copy of the record to be merged
HoodieRecord<T> newRecord = keyToNewRecords.get(key).newInstance();
try {
- Option<Pair<HoodieRecord, Schema>> mergeResult =
recordMerger.merge(oldRecord, oldSchema, newRecord, newSchema, props);
+ Option<Pair<HoodieRecord, Schema>> mergeResult = recordMerger.merge(
Review Comment:
I agree with @linliu-code that here the merge handle (`HoodieMergeHandle`)
only handles the updates associated with the existing base file (in this handle
the record key in `keyToNewRecords` should always have a corresponding record
in the base file for merging, at least on Spark side). @beyond1920 we can
check the logic in other places where inserts should get called by `merge` API.
Is that what you mean?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]