This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 88caeaa11c0 [HUDI-8521] Use commit time merge in preCombine for
OverwriteWithLatestAvroPayload (#12297)
88caeaa11c0 is described below
commit 88caeaa11c09acc94554a7dc1d63f6e4b28dc8a0
Author: Lin Liu <[email protected]>
AuthorDate: Mon Nov 25 16:40:35 2024 -0800
[HUDI-8521] Use commit time merge in preCombine for
OverwriteWithLatestAvroPayload (#12297)
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../org/apache/hudi/common/model/AWSDmsAvroPayload.java | 14 ++++++++++++++
.../hudi/common/model/DefaultHoodieRecordPayload.java | 14 ++++++++++++++
.../model/OverwriteNonDefaultsWithLatestAvroPayload.java | 14 ++++++++++++++
.../hudi/common/model/OverwriteWithLatestAvroPayload.java | 15 +++++----------
.../model/debezium/AbstractDebeziumAvroPayload.java | 14 ++++++++++++++
.../common/model/TestOverwriteWithLatestAvroPayload.java | 4 ++--
.../common/testutils/reader/HoodieRecordTestPayload.java | 13 +++++++++++++
.../scala/org/apache/hudi/TestDataSourceDefaults.scala | 6 +++---
8 files changed, 79 insertions(+), 15 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java
index f22cfc08334..d0a004b796b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java
@@ -68,6 +68,20 @@ public class AWSDmsAvroPayload extends
OverwriteWithLatestAvroPayload {
return delete ? Option.empty() : Option.of(insertValue);
}
+ @Override
+ public OverwriteWithLatestAvroPayload
preCombine(OverwriteWithLatestAvroPayload oldValue) {
+ if (oldValue.recordBytes.length == 0) {
+ // use natural order for delete record
+ return this;
+ }
+ if (oldValue.orderingVal.compareTo(orderingVal) > 0) {
+ // pick the payload with greatest ordering value
+ return oldValue;
+ } else {
+ return this;
+ }
+ }
+
@Override
public Option<IndexedRecord> getInsertValue(Schema schema, Properties
properties) throws IOException {
return getInsertValue(schema);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
index a3e6ce1f133..40f7558a2b6 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
@@ -59,6 +59,20 @@ public class DefaultHoodieRecordPayload extends
OverwriteWithLatestAvroPayload {
this(record.isPresent() ? record.get() : null, 0); // natural order
}
+ @Override
+ public OverwriteWithLatestAvroPayload
preCombine(OverwriteWithLatestAvroPayload oldValue) {
+ if (oldValue.recordBytes.length == 0) {
+ // use natural order for delete record
+ return this;
+ }
+ if (oldValue.orderingVal.compareTo(orderingVal) > 0) {
+ // pick the payload with greatest ordering value
+ return oldValue;
+ } else {
+ return this;
+ }
+ }
+
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
currentValue, Schema schema, Properties properties) throws IOException {
if (recordBytes.length == 0) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java
index 6e060a735fc..a4a4f7c297b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteNonDefaultsWithLatestAvroPayload.java
@@ -47,6 +47,20 @@ public class OverwriteNonDefaultsWithLatestAvroPayload
extends OverwriteWithLate
super(record); // natural order
}
+ @Override
+ public OverwriteWithLatestAvroPayload
preCombine(OverwriteWithLatestAvroPayload oldValue) {
+ if (oldValue.recordBytes.length == 0) {
+ // use natural order for delete record
+ return this;
+ }
+ if (oldValue.orderingVal.compareTo(orderingVal) > 0) {
+ // pick the payload with greatest ordering value
+ return oldValue;
+ } else {
+ return this;
+ }
+ }
+
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
currentValue, Schema schema) throws IOException {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
index dac9b828896..b0ed1ef22a2 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
@@ -48,16 +48,7 @@ public class OverwriteWithLatestAvroPayload extends
BaseAvroPayload
@Override
public OverwriteWithLatestAvroPayload
preCombine(OverwriteWithLatestAvroPayload oldValue) {
- if (oldValue.recordBytes.length == 0) {
- // use natural order for delete record
- return this;
- }
- if (oldValue.orderingVal.compareTo(orderingVal) > 0) {
- // pick the payload with greatest ordering value
- return oldValue;
- } else {
- return this;
- }
+ return this;
}
@Override
@@ -88,4 +79,8 @@ public class OverwriteWithLatestAvroPayload extends
BaseAvroPayload
public Comparable<?> getOrderingValue() {
return this.orderingVal;
}
+
+ public byte[] getRecordBytes() {
+ return recordBytes;
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/AbstractDebeziumAvroPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/AbstractDebeziumAvroPayload.java
index 82395d6f606..065286e7d92 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/AbstractDebeziumAvroPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/AbstractDebeziumAvroPayload.java
@@ -54,6 +54,20 @@ public abstract class AbstractDebeziumAvroPayload extends
OverwriteWithLatestAvr
super(record);
}
+ @Override
+ public OverwriteWithLatestAvroPayload
preCombine(OverwriteWithLatestAvroPayload oldValue) {
+ if (oldValue.getRecordBytes().length == 0) {
+ // use natural order for delete record
+ return this;
+ }
+ if (((Comparable) oldValue.getOrderingValue()).compareTo(orderingVal) > 0)
{
+ // pick the payload with greatest ordering value
+ return oldValue;
+ } else {
+ return this;
+ }
+ }
+
@Override
public Option<IndexedRecord> getInsertValue(Schema schema) throws
IOException {
Option<IndexedRecord> insertValue = getInsertRecord(schema);
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
index 7c5951a7cac..6782c2dc9db 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
@@ -64,7 +64,7 @@ public class TestOverwriteWithLatestAvroPayload {
OverwriteWithLatestAvroPayload payload1 = new
OverwriteWithLatestAvroPayload(record1, 1);
OverwriteWithLatestAvroPayload payload2 = new
OverwriteWithLatestAvroPayload(record2, 2);
- assertEquals(payload1.preCombine(payload2), payload2);
+ assertEquals(payload1.preCombine(payload2), payload1);
assertEquals(payload2.preCombine(payload1), payload2);
assertEquals(record1, payload1.getInsertValue(schema).get());
@@ -90,7 +90,7 @@ public class TestOverwriteWithLatestAvroPayload {
OverwriteWithLatestAvroPayload payload1 = new
OverwriteWithLatestAvroPayload(record1, 1);
OverwriteWithLatestAvroPayload payload2 = new
OverwriteWithLatestAvroPayload(delRecord1, 2);
- assertEquals(payload1.preCombine(payload2), payload2);
+ assertEquals(payload1.preCombine(payload2), payload1);
assertEquals(payload2.preCombine(payload1), payload2);
assertEquals(record1, payload1.getInsertValue(schema).get());
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieRecordTestPayload.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieRecordTestPayload.java
index bff49a025fa..93db765c4b1 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieRecordTestPayload.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/reader/HoodieRecordTestPayload.java
@@ -52,6 +52,19 @@ public class HoodieRecordTestPayload extends
OverwriteWithLatestAvroPayload {
super(record, orderingVal);
}
+ public HoodieRecordTestPayload preCombine(HoodieRecordTestPayload oldValue) {
+ if (oldValue.recordBytes.length == 0) {
+ // use natural order for delete record
+ return this;
+ }
+ if (oldValue.orderingVal.compareTo(orderingVal) > 0) {
+ // pick the payload with greatest ordering value
+ return oldValue;
+ } else {
+ return this;
+ }
+ }
+
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
currentValue, Schema schema, Properties properties) throws IOException {
// The new record is a delete record.
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
index 5f0cd498338..8a3fdb0a4e3 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestDataSourceDefaults.scala
@@ -547,7 +547,7 @@ class TestDataSourceDefaults extends ScalaAssertionSupport {
// it will provide the record with greatest combine value
val combinedPayload12 = overWritePayload1.preCombine(overWritePayload2)
val combinedGR12 =
combinedPayload12.getInsertValue(schema).get().asInstanceOf[GenericRecord]
- assertEquals("field2", combinedGR12.get("field1").toString)
+ assertEquals("field1", combinedGR12.get("field1").toString)
// and it will be deterministic, to order of processing.
val combinedPayload21 = overWritePayload2.preCombine(overWritePayload1)
@@ -570,10 +570,10 @@ class TestDataSourceDefaults extends
ScalaAssertionSupport {
val laterOrderingVal: Object = laterRecord.get("favoriteIntNumber")
val newerPayload = new OverwriteWithLatestAvroPayload(laterRecord,
HoodieAvroUtils.convertValueForSpecificDataTypes(fieldSchema, laterOrderingVal,
false).asInstanceOf[Comparable[_]])
- // it will provide the record with greatest combine value
+ // it always returns the latest payload.
val preCombinedPayload = basePayload.preCombine(newerPayload)
val precombinedGR =
preCombinedPayload.getInsertValue(schema).get().asInstanceOf[GenericRecord]
- assertEquals("field2", precombinedGR.get("field1").toString)
+ assertEquals("field1", precombinedGR.get("field1").toString)
}
@Test def testDefaultHoodieRecordPayloadCombineAndGetUpdateValue(): Unit = {