nsivabalan commented on code in PR #13115:
URL: https://github.com/apache/hudi/pull/13115#discussion_r2036318344


##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -233,11 +233,12 @@ public void close() {
   /**
    * Merge two log data records if needed.
    *
-   * @param record
-   * @param metadata
-   * @param existingRecordMetadataPair
-   * @return
-   * @throws IOException
+   * @param record                     The record
+   * @param metadata                   The metadata
+   * @param existingRecordMetadataPair The existing record metadata pair
+   *
+   * @return The pair of the record that needs to be updated with and its 
metadata,
+   * returns empty to skip the update.
    */
   protected Option<Pair<Option<T>, Map<String, Object>>> 
doProcessNextDataRecord(T record,

Review Comment:
   can we change the 1st arg to "newRecord" to avoid confusion.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -233,11 +233,12 @@ public void close() {
   /**
    * Merge two log data records if needed.
    *
-   * @param record
-   * @param metadata
-   * @param existingRecordMetadataPair
-   * @return
-   * @throws IOException
+   * @param record                     The record
+   * @param metadata                   The metadata
+   * @param existingRecordMetadataPair The existing record metadata pair
+   *
+   * @return The pair of the record that needs to be updated with and its 
metadata,
+   * returns empty to skip the update.

Review Comment:
   can we enhance the java docs of this method. 
   I would like to see whats the expected behavior in all diff cases. 
   
   say for event time based merge
   
    old.     | new.  | expected behavior
   -------|----|-------
   valid rec w/ valid ord value | valid rec w/ same ord value | new 
   valid rec w/ valid ord value | valid rec w/ higher ord value | new 
   valid rec w/ valid ord value | valid rec w/ lower order value | old 
   valid rec w/ valid ord value | valid rec w/ missing ord value | new 
   valid rec w/ valid ord value | deleted w/ same ord value | deleted. but 
whats the expected return val from this method
   valid rec w/ valid ord value | deleted w/ higher ord value | deleted. but 
whats the expected return val from this method
   valid rec w/ valid ord value | deleted w/ lower ord value | old 
   valid rec w/ valid ord value | deleted w/ missing ord value | deleted. but 
whats the expected return val from this method
   
   
   
   
   
   
   
   
   
   
   



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -275,43 +274,47 @@ protected Option<Pair<Option<T>, Map<String, Object>>> 
doProcessNextDataRecord(T
       } else {
         switch (recordMergeMode) {
           case COMMIT_TIME_ORDERING:
-            return Option.empty();
+            return Option.of(Pair.of(Option.ofNullable(record), metadata));
           case EVENT_TIME_ORDERING:
-            Comparable existingOrderingValue = readerContext.getOrderingValue(
-                existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(),
-                readerSchema, orderingFieldName);
-            if 
(isDeleteRecordWithNaturalOrder(existingRecordMetadataPair.getLeft(), 
existingOrderingValue)) {
-              return Option.empty();
-            }
-            Comparable incomingOrderingValue = readerContext.getOrderingValue(
-                Option.of(record), metadata, readerSchema, orderingFieldName);
-            if (incomingOrderingValue.compareTo(existingOrderingValue) > 0) {
+            if (shouldKeepNewerRecord(existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight(), Option.ofNullable(record), metadata)) {
               return Option.of(Pair.of(Option.of(record), metadata));
             }
             return Option.empty();

Review Comment:
   logic used in L278-282 and L481-488 are slightly differing. specifically in 
the `else` block. is that intended? 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -275,43 +274,47 @@ protected Option<Pair<Option<T>, Map<String, Object>>> 
doProcessNextDataRecord(T
       } else {
         switch (recordMergeMode) {
           case COMMIT_TIME_ORDERING:
-            return Option.empty();
+            return Option.of(Pair.of(Option.ofNullable(record), metadata));

Review Comment:
   can we add tests for this case please? 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -341,17 +344,18 @@ protected Option<Pair<Option<T>, Map<String, Object>>> 
doProcessNextDataRecord(T
   /**
    * Merge a delete record with another record (data, or delete).
    *
-   * @param deleteRecord
-   * @param existingRecordMetadataPair
-   * @return
+   * @param deleteRecord               The delete record
+   * @param existingRecordMetadataPair The existing record metadata pair
+   *
+   * @return The option of new delete record that needs to be updated with.
    */
   protected Option<DeleteRecord> doProcessNextDeletedRecord(DeleteRecord 
deleteRecord,
                                                             Pair<Option<T>, 
Map<String, Object>> existingRecordMetadataPair) {
     totalLogRecords++;
     if (existingRecordMetadataPair != null) {
       switch (recordMergeMode) {
         case COMMIT_TIME_ORDERING:
-          return Option.empty();
+          return Option.of(deleteRecord);

Review Comment:
   do we have tests for this case?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -504,6 +529,19 @@ protected Option<T> merge(Option<T> older, Map<String, 
Object> olderInfoMap,
     }
   }
 
+  /**
+   * Decides whether to keep the incoming record with ordering value 
comparison.
+   */
+  private boolean shouldKeepNewerRecord(Option<T> oldVal, Map<String, Object> 
oldMetadata, Option<T> newVal, Map<String, Object> newMetadata) {
+    Comparable newOrderingVal = readerContext.getOrderingValue(newVal, 
newMetadata, readerSchema, orderingFieldName);
+    if (isDeleteRecordWithNaturalOrder(newVal, newOrderingVal)) {
+      // handle records coming from DELETE statements(the orderingVal is 
constant 0)
+      return true;
+    }
+    Comparable oldOrderingVal = readerContext.getOrderingValue(oldVal, 
oldMetadata, readerSchema, orderingFieldName);
+    return newOrderingVal.compareTo(oldOrderingVal) >= 0;

Review Comment:
   this could be a very corner case and we may not have a solution too. but 
wanted to bring it up.
   what incase oldRecord was missing ord value and so we treat it as 0.
   and new one has negative ordering value. 
   w/ L542, we will favor old record right. even though old one did not have a 
ordering value and new one has a valid value.
   
   not looking to fix in this patch. was just curious if this is a known 
limitation 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java:
##########
@@ -233,11 +233,12 @@ public void close() {
   /**
    * Merge two log data records if needed.
    *
-   * @param record
-   * @param metadata
-   * @param existingRecordMetadataPair
-   * @return
-   * @throws IOException
+   * @param record                     The record
+   * @param metadata                   The metadata
+   * @param existingRecordMetadataPair The existing record metadata pair
+   *
+   * @return The pair of the record that needs to be updated with and its 
metadata,
+   * returns empty to skip the update.

Review Comment:
   similar to this, we need to prepare a table and document it for all 3 types 
of merge modes. 
   So, that anyone who is touching this code is well aware of whats expected 
out of this method. 
   We have made atleast 4 to 5 bug fixes around this so far.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to