This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 21cbfce617 [HUDI-4864] Fix AWSDmsAvroPayload#combineAndGetUpdateValue
when using MOR snapshot query after delete operations with test (#6688)
21cbfce617 is described below
commit 21cbfce617689a1eff4d405bf5b19639d16e1c68
Author: Rahil C <[email protected]>
AuthorDate: Fri Sep 16 18:47:29 2022 -0700
[HUDI-4864] Fix AWSDmsAvroPayload#combineAndGetUpdateValue when using MOR
snapshot query after delete operations with test (#6688)
Co-authored-by: Rahil Chertara <[email protected]>
---
.../apache/hudi/common/model/AWSDmsAvroPayload.java | 9 ++++++---
.../hudi/common/model/TestAWSDmsAvroPayload.java | 21 +++++++++++++++++++++
2 files changed, 27 insertions(+), 3 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java
index 7153ea069d..fe044e0b43 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/AWSDmsAvroPayload.java
@@ -49,7 +49,7 @@ public class AWSDmsAvroPayload extends
OverwriteWithLatestAvroPayload {
}
public AWSDmsAvroPayload(Option<GenericRecord> record) {
- this(record.get(), 0); // natural order
+ this(record.isPresent() ? record.get() : null, 0); // natural order
}
/**
@@ -87,7 +87,10 @@ public class AWSDmsAvroPayload extends
OverwriteWithLatestAvroPayload {
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
currentValue, Schema schema)
throws IOException {
- IndexedRecord insertValue = super.getInsertValue(schema).get();
- return handleDeleteOperation(insertValue);
+ Option<IndexedRecord> insertValue = super.getInsertValue(schema);
+ if (!insertValue.isPresent()) {
+ return Option.empty();
+ }
+ return handleDeleteOperation(insertValue.get());
}
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestAWSDmsAvroPayload.java
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestAWSDmsAvroPayload.java
index a60f4ff6a7..07bc1d6f43 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestAWSDmsAvroPayload.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestAWSDmsAvroPayload.java
@@ -108,6 +108,27 @@ public class TestAWSDmsAvroPayload {
}
+ @Test
+ public void testDeleteWithEmptyPayLoad() {
+ Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING);
+ Properties properties = new Properties();
+
+ GenericRecord oldRecord = new GenericData.Record(avroSchema);
+ oldRecord.put("field1", 2);
+ oldRecord.put("Op", "U");
+
+ AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.empty());
+
+ try {
+ Option<IndexedRecord> outputPayload =
payload.combineAndGetUpdateValue(oldRecord, avroSchema, properties);
+ // expect nothing to be committed to table
+ assertFalse(outputPayload.isPresent());
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail("Unexpected exception");
+ }
+ }
+
@Test
public void testPreCombineWithDelete() {
Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING);