codope commented on code in PR #9593:
URL: https://github.com/apache/hudi/pull/9593#discussion_r1339533170
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java:
##########
@@ -44,7 +44,7 @@ public interface HoodieRecordMerger extends Serializable {
* It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we
can translate as having 3 versions A, B, C
* of the single record, both orders of operations applications have to
yield the same result)
*/
- Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws
IOException;
+ Option<Pair<HoodieRecord, Schema>> merge(Option<HoodieRecord> older, Schema
oldSchema, Option<HoodieRecord> newer, Schema newSchema, TypedProperties props)
throws IOException;
Review Comment:
Let's update the javadoc to define the parameters as well.
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java:
##########
@@ -43,9 +43,29 @@ public String getMergingStrategy() {
}
@Override
- public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws
IOException {
- return combineAndGetUpdateValue(older, newer, newSchema, props)
- .map(r -> Pair.of(new HoodieAvroIndexedRecord(r), r.getSchema()));
+ public Option<Pair<HoodieRecord, Schema>> merge(Option<HoodieRecord> older,
+ Schema oldSchema,
+ Option<HoodieRecord> newer,
+ Schema newSchema,
+ TypedProperties props)
throws IOException {
+ boolean isValidNew = isValid(newer);
+ boolean isValidOld = isValid(older);
+ boolean isDeleteNew = isValidNew && isDelete(newer, newSchema, props);
+
+ if (!isValidOld && !isValidNew) { // No meaningful information found.
+ return Option.empty();
+ } else if (isValidOld && !isValidNew) { // Return old record for data
safety.
+ return Option.of(Pair.of(older.get(), oldSchema));
+ } else if (!isValidOld) { // Either insert or delete case, return the new
record.
+ return Option.of(Pair.of(newer.get(), newSchema));
+ } else {
+ if (isDeleteNew) { // delete case
+ return Option.of(Pair.of(newer.get(), newSchema));
+ }
Review Comment:
I guess this is all the complication that @danny0405 mentioned. I think we
should still check for validity before calling `combineAndGetUpdateValue`.
However, we can keep the handling of indert/delete to a separate method that
can be overridden. Default would be to return the new record.
IMO, one method is cleaner and easy-to-use. Also, I checked what Kafka and
Flink do. They also provide one method and leave the handling of empty/null
keys to the concrete implementation.
Kafka Merger -
https://kafka.apache.org/35/javadoc/org/apache/kafka/streams/kstream/Merger.html
Flink InternalRowMerger -
https://nightlies.apache.org/flink/flink-docs-release-1.17/api/java/org/apache/flink/table/examples/java/functions/InternalRowMergerFunction.html
@danny0405 What do you think?
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java:
##########
@@ -56,7 +76,8 @@ public HoodieRecordType getRecordType() {
private Option<IndexedRecord> combineAndGetUpdateValue(HoodieRecord older,
HoodieRecord newer, Schema schema, Properties props) throws IOException {
Option<IndexedRecord> previousAvroData = older.toIndexedRecord(schema,
props).map(HoodieAvroIndexedRecord::getData);
if (!previousAvroData.isPresent()) {
- return Option.empty();
+ Option<IndexedRecord> newData = newer.toIndexedRecord(schema,
props).map(HoodieAvroIndexedRecord::getData);
+ return newData;
Review Comment:
Yeah I think this covers `HoodieRecordPayload#getInsertValue` because
`previousAvroData` is not present means no such record exists on storage.
Ideally, we shouldn't even be inside this if-block because the older record is
empty (not valid) and hence the code will return from line 60 of `merge` method.
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java:
##########
@@ -66,4 +87,23 @@ private Option<IndexedRecord>
combineAndGetUpdateValue(HoodieRecord older, Hoodi
public HoodieRecordMerger asPreCombiningMode() {
return HoodiePreCombineAvroRecordMerger.INSTANCE;
}
+
+ /**
+ * If a record is valid, it means it tells the merger something meaningful.
+ * Otherwise, nothing meaningful.
+ */
+ private boolean isValid(Option<HoodieRecord> record) {
+ return record.isPresent()
+ && record.get().getRecordType() == HoodieRecordType.AVRO;
+ }
+
+ /**
+ * Check if a DELETE operation is intended.
+ */
+ private boolean isDelete(Option<HoodieRecord> record, Schema schema,
TypedProperties props) throws IOException {
Review Comment:
Can we move this method and the above to a suitable util class in
`hudi-common`, or maybe define s static methods in `HoodieRecordMerger`
interface? We can reuse across mergers and/or write handle as the
implementation does not differ.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]