This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 88caeaa11c0 [HUDI-8521] Use commit time merge in preCombine for 
OverwriteWithLatestAvroPayload (#12297)
88caeaa11c0 is described below

commit 88caeaa11c09acc94554a7dc1d63f6e4b28dc8a0
Author: Lin Liu <[email protected]>
AuthorDate: Mon Nov 25 16:40:35 2024 -0800

    [HUDI-8521] Use commit time merge in preCombine for 
OverwriteWithLatestAvroPayload (#12297)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
---
 .../org/apache/hudi/common/model/AWSDmsAvroPayload.java   | 14 ++++++++++++++
 .../hudi/common/model/DefaultHoodieRecordPayload.java     | 14 ++++++++++++++
 .../model/OverwriteNonDefaultsWithLatestAvroPayload.java  | 14 ++++++++++++++
 .../hudi/common/model/OverwriteWithLatestAvroPayload.java | 15 +++++----------
 .../model/debezium/AbstractDebeziumAvroPayload.java       | 14 ++++++++++++++
 .../common/model/TestOverwriteWithLatestAvroPayload.java  |  4 ++--
 .../common/testutils/reader/HoodieRecordTestPayload.java  | 13 +++++++++++++
 .../scala/org/apache/hudi/TestDataSourceDefaults.scala    |  6 +++---
 8 files changed, 79 insertions(+), 15 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java
index f22cfc08334..d0a004b796b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java
@@ -68,6 +68,20 @@ public class AWSDmsAvroPayload extends 
OverwriteWithLatestAvroPayload {
     return delete ? Option.empty() : Option.of(insertValue);
   }
 
+  @Override
+  public OverwriteWithLatestAvroPayload 
preCombine(OverwriteWithLatestAvroPayload oldValue) {
+    if (oldValue.recordBytes.length == 0) {
+      // use natural order for delete record
+      return this;
+    }
+    if (oldValue.orderingVal.compareTo(orderingVal) > 0) {
+      // pick the payload with greatest ordering value
+      return oldValue;
+    } else {
+      return this;
+    }
+  }
+
   @Override
   public Option<IndexedRecord> getInsertValue(Schema schema, Properties 
properties) throws IOException {
     return getInsertValue(schema);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
index a3e6ce1f133..40f7558a2b6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
@@ -59,6 +59,20 @@ public class DefaultHoodieRecordPayload extends 
OverwriteWithLatestAvroPayload {
     this(record.isPresent() ? record.get() : null, 0); // natural order
   }
 
+  @Override
+  public OverwriteWithLatestAvroPayload 
preCombine(OverwriteWithLatestAvroPayload oldValue) {
+    if (oldValue.recordBytes.length == 0) {
+      // use natural order for delete record
+      return this;
+    }
+    if (oldValue.orderingVal.compareTo(orderingVal) > 0) {
+      // pick the payload with greatest ordering value
+      return oldValue;
+    } else {
+      return this;
+    }
+  }
+
   @Override
   public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
currentValue, Schema schema, Properties properties) throws IOException {
     if (recordBytes.length == 0) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java
index 6e060a735fc..a4a4f7c297b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java
@@ -47,6 +47,20 @@ public class OverwriteNonDefaultsWithLatestAvroPayload 
extends OverwriteWithLate
     super(record); // natural order
   }
 
+  @Override
+  public OverwriteWithLatestAvroPayload 
preCombine(OverwriteWithLatestAvroPayload oldValue) {
+    if (oldValue.recordBytes.length == 0) {
+      // use natural order for delete record
+      return this;
+    }
+    if (oldValue.orderingVal.compareTo(orderingVal) > 0) {
+      // pick the payload with greatest ordering value
+      return oldValue;
+    } else {
+      return this;
+    }
+  }
+
   @Override
   public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
currentValue, Schema schema) throws IOException {
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
index dac9b828896..b0ed1ef22a2 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
@@ -48,16 +48,7 @@ public class OverwriteWithLatestAvroPayload extends 
BaseAvroPayload
 
   @Override
   public OverwriteWithLatestAvroPayload 
preCombine(OverwriteWithLatestAvroPayload oldValue) {
-    if (oldValue.recordBytes.length == 0) {
-      // use natural order for delete record
-      return this;
-    }
-    if (oldValue.orderingVal.compareTo(orderingVal) > 0) {
-      // pick the payload with greatest ordering value
-      return oldValue;
-    } else {
-      return this;
-    }
+    return this;
   }
 
   @Override
@@ -88,4 +79,8 @@ public class OverwriteWithLatestAvroPayload extends 
BaseAvroPayload
   public Comparable<?> getOrderingValue() {
     return this.orderingVal;
   }
+
+  public byte[] getRecordBytes() {
+    return recordBytes;
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/AbstractDebeziumAvroPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/AbstractDebeziumAvroPayload.java
index 82395d6f606..065286e7d92 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/AbstractDebeziumAvroPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/AbstractDebeziumAvroPayload.java
@@ -54,6 +54,20 @@ public abstract class AbstractDebeziumAvroPayload extends 
OverwriteWithLatestAvr
     super(record);
   }
 
+  @Override
+  public OverwriteWithLatestAvroPayload 
preCombine(OverwriteWithLatestAvroPayload oldValue) {
+    if (oldValue.getRecordBytes().length == 0) {
+      // use natural order for delete record
+      return this;
+    }
+    if (((Comparable) oldValue.getOrderingValue()).compareTo(orderingVal) > 0) 
{
+      // pick the payload with greatest ordering value
+      return oldValue;
+    } else {
+      return this;
+    }
+  }
+
   @Override
   public Option<IndexedRecord> getInsertValue(Schema schema) throws 
IOException {
     Option<IndexedRecord> insertValue = getInsertRecord(schema);
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
index 7c5951a7cac..6782c2dc9db 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
@@ -64,7 +64,7 @@ public class TestOverwriteWithLatestAvroPayload {
 
     OverwriteWithLatestAvroPayload payload1 = new 
OverwriteWithLatestAvroPayload(record1, 1);
     OverwriteWithLatestAvroPayload payload2 = new 
OverwriteWithLatestAvroPayload(record2, 2);
-    assertEquals(payload1.preCombine(payload2), payload2);
+    assertEquals(payload1.preCombine(payload2), payload1);
     assertEquals(payload2.preCombine(payload1), payload2);
 
     assertEquals(record1, payload1.getInsertValue(schema).get());
@@ -90,7 +90,7 @@ public class TestOverwriteWithLatestAvroPayload {
 
     OverwriteWithLatestAvroPayload payload1 = new 
OverwriteWithLatestAvroPayload(record1, 1);
     OverwriteWithLatestAvroPayload payload2 = new 
OverwriteWithLatestAvroPayload(delRecord1, 2);
-    assertEquals(payload1.preCombine(payload2), payload2);
+    assertEquals(payload1.preCombine(payload2), payload1);
     assertEquals(payload2.preCombine(payload1), payload2);
 
     assertEquals(record1, payload1.getInsertValue(schema).get());
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieRecordTestPayload.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieRecordTestPayload.java
index bff49a025fa..93db765c4b1 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieRecordTestPayload.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieRecordTestPayload.java
@@ -52,6 +52,19 @@ public class HoodieRecordTestPayload extends 
OverwriteWithLatestAvroPayload {
     super(record, orderingVal);
   }
 
+  public HoodieRecordTestPayload preCombine(HoodieRecordTestPayload oldValue) {
+    if (oldValue.recordBytes.length == 0) {
+      // use natural order for delete record
+      return this;
+    }
+    if (oldValue.orderingVal.compareTo(orderingVal) > 0) {
+      // pick the payload with greatest ordering value
+      return oldValue;
+    } else {
+      return this;
+    }
+  }
+
   @Override
   public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
currentValue, Schema schema, Properties properties) throws IOException {
     // The new record is a delete record.
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
index 5f0cd498338..8a3fdb0a4e3 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
@@ -547,7 +547,7 @@ class TestDataSourceDefaults extends ScalaAssertionSupport {
     // it will provide the record with greatest combine value
     val combinedPayload12 = overWritePayload1.preCombine(overWritePayload2)
     val combinedGR12 = 
combinedPayload12.getInsertValue(schema).get().asInstanceOf[GenericRecord]
-    assertEquals("field2", combinedGR12.get("field1").toString)
+    assertEquals("field1", combinedGR12.get("field1").toString)
 
     // and it will be deterministic, to order of processing.
     val combinedPayload21 = overWritePayload2.preCombine(overWritePayload1)
@@ -570,10 +570,10 @@ class TestDataSourceDefaults extends 
ScalaAssertionSupport {
     val laterOrderingVal: Object = laterRecord.get("favoriteIntNumber")
     val newerPayload = new OverwriteWithLatestAvroPayload(laterRecord, 
HoodieAvroUtils.convertValueForSpecificDataTypes(fieldSchema, laterOrderingVal, 
false).asInstanceOf[Comparable[_]])
 
-    // it will provide the record with greatest combine value
+    // it always returns the latest payload.
     val preCombinedPayload = basePayload.preCombine(newerPayload)
     val precombinedGR = 
preCombinedPayload.getInsertValue(schema).get().asInstanceOf[GenericRecord]
-    assertEquals("field2", precombinedGR.get("field1").toString)
+    assertEquals("field1", precombinedGR.get("field1").toString)
   }
 
   @Test def testDefaultHoodieRecordPayloadCombineAndGetUpdateValue(): Unit = {

Reply via email to