codope commented on code in PR #9593:
URL: https://github.com/apache/hudi/pull/9593#discussion_r1339533170


##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecordMerger.java:
##########
@@ -44,7 +44,7 @@ public interface HoodieRecordMerger extends Serializable {
    * It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we 
can translate as having 3 versions A, B, C
    * of the single record, both orders of operations applications have to 
yield the same result)
    */
-  Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema 
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws 
IOException;
+  Option<Pair<HoodieRecord, Schema>> merge(Option<HoodieRecord> older, Schema 
oldSchema, Option<HoodieRecord> newer, Schema newSchema, TypedProperties props) 
throws IOException;

Review Comment:
   Let's update the javadoc to define the parameters as well.



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java:
##########
@@ -43,9 +43,29 @@ public String getMergingStrategy() {
   }
 
   @Override
-  public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema 
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws 
IOException {
-    return combineAndGetUpdateValue(older, newer, newSchema, props)
-        .map(r -> Pair.of(new HoodieAvroIndexedRecord(r), r.getSchema()));
+  public Option<Pair<HoodieRecord, Schema>> merge(Option<HoodieRecord> older,
+                                                  Schema oldSchema,
+                                                  Option<HoodieRecord> newer,
+                                                  Schema newSchema,
+                                                  TypedProperties props) 
throws IOException {
+    boolean isValidNew = isValid(newer);
+    boolean isValidOld = isValid(older);
+    boolean isDeleteNew = isValidNew && isDelete(newer, newSchema, props);
+
+    if (!isValidOld && !isValidNew) { // No meaningful information found.
+      return Option.empty();
+    } else if (isValidOld && !isValidNew) { // Return old record for data 
safety.
+      return Option.of(Pair.of(older.get(), oldSchema));
+    } else if (!isValidOld) { // Either insert or delete case, return the new 
record.
+      return Option.of(Pair.of(newer.get(), newSchema));
+    } else {
+      if (isDeleteNew) { // delete case
+        return Option.of(Pair.of(newer.get(), newSchema));
+      }

Review Comment:
   I guess this is all the complication that @danny0405 mentioned. I think we 
should still check for validity before calling `combineAndGetUpdateValue`. 
However, we can keep the handling of indert/delete to a separate method that 
can be overridden. Default would be to return the new record.
   
   IMO, one method is cleaner and easy-to-use. Also, I checked what Kafka and 
Flink do. They also provide one method and leave the handling of empty/null 
keys to the concrete implementation.
   Kafka Merger - 
https://kafka.apache.org/35/javadoc/org/apache/kafka/streams/kstream/Merger.html
   Flink InternalRowMerger - 
https://nightlies.apache.org/flink/flink-docs-release-1.17/api/java/org/apache/flink/table/examples/java/functions/InternalRowMergerFunction.html
   
   @danny0405 What do you think? 



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java:
##########
@@ -56,7 +76,8 @@ public HoodieRecordType getRecordType() {
   private Option<IndexedRecord> combineAndGetUpdateValue(HoodieRecord older, 
HoodieRecord newer, Schema schema, Properties props) throws IOException {
     Option<IndexedRecord> previousAvroData = older.toIndexedRecord(schema, 
props).map(HoodieAvroIndexedRecord::getData);
     if (!previousAvroData.isPresent()) {
-      return Option.empty();
+      Option<IndexedRecord> newData = newer.toIndexedRecord(schema, 
props).map(HoodieAvroIndexedRecord::getData);
+      return newData;

Review Comment:
   Yeah I think this covers `HoodieRecordPayload#getInsertValue` because 
`previousAvroData` is not present means no such record exists on storage. 
Ideally, we shouldn't even be inside this if-block because the older record is 
empty (not valid) and hence the code will return from line 60 of `merge` method.



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecordMerger.java:
##########
@@ -66,4 +87,23 @@ private Option<IndexedRecord> 
combineAndGetUpdateValue(HoodieRecord older, Hoodi
   public HoodieRecordMerger asPreCombiningMode() {
     return HoodiePreCombineAvroRecordMerger.INSTANCE;
   }
+
+  /**
+   * If a record is valid, it means it tells the merger something meaningful.
+   * Otherwise, nothing meaningful.
+   */
+  private boolean isValid(Option<HoodieRecord> record) {
+    return record.isPresent()
+        && record.get().getRecordType() == HoodieRecordType.AVRO;
+  }
+
+  /**
+   * Check if a DELETE operation is intended.
+   */
+  private boolean isDelete(Option<HoodieRecord> record, Schema schema, 
TypedProperties props) throws IOException {

Review Comment:
   Can we move this method and the above to a suitable util class in 
`hudi-common`, or maybe define s static methods in `HoodieRecordMerger` 
interface? We can reuse across mergers and/or write handle as the 
implementation does not differ.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to