This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5e61454 [HUDI-802] AWSDmsTransformer does not handle insert and
delete of a row in a single batch correctly (#2084)
5e61454 is described below
commit 5e61454a6c343225e6ca90c4be79e0c2937a748a
Author: Balaji Varadarajan <[email protected]>
AuthorDate: Fri Sep 11 16:11:42 2020 -0700
[HUDI-802] AWSDmsTransformer does not handle insert and delete of a row in
a single batch correctly (#2084)
---
.../org/apache/hudi/payload/AWSDmsAvroPayload.java | 23 +++-
.../apache/hudi/payload/TestAWSDmsAvroPayload.java | 132 +++++++++++++++++++++
2 files changed, 151 insertions(+), 4 deletions(-)
diff --git
a/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java
b/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java
index 975151c..73711c7 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/payload/AWSDmsAvroPayload.java
@@ -53,10 +53,12 @@ public class AWSDmsAvroPayload extends
OverwriteWithLatestAvroPayload {
this(record.get(), (record1) -> 0); // natural order
}
- @Override
- public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
currentValue, Schema schema)
- throws IOException {
- IndexedRecord insertValue = getInsertValue(schema).get();
+ /**
+ *
+ * Handle a possible delete - check for "D" in Op column and return empty
row if found.
+ * @param insertValue The new row that is being "inserted".
+ */
+ private Option<IndexedRecord> handleDeleteOperation(IndexedRecord
insertValue) throws IOException {
boolean delete = false;
if (insertValue instanceof GenericRecord) {
GenericRecord record = (GenericRecord) insertValue;
@@ -65,4 +67,17 @@ public class AWSDmsAvroPayload extends
OverwriteWithLatestAvroPayload {
return delete ? Option.empty() : Option.of(insertValue);
}
+
+ @Override
+ public Option<IndexedRecord> getInsertValue(Schema schema) throws
IOException {
+ IndexedRecord insertValue = super.getInsertValue(schema).get();
+ return handleDeleteOperation(insertValue);
+ }
+
+ @Override
+ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
currentValue, Schema schema)
+ throws IOException {
+ IndexedRecord insertValue = super.getInsertValue(schema).get();
+ return handleDeleteOperation(insertValue);
+ }
}
diff --git
a/hudi-spark/src/test/java/org/apache/hudi/payload/TestAWSDmsAvroPayload.java
b/hudi-spark/src/test/java/org/apache/hudi/payload/TestAWSDmsAvroPayload.java
new file mode 100644
index 0000000..802096a
--- /dev/null
+++
b/hudi-spark/src/test/java/org/apache/hudi/payload/TestAWSDmsAvroPayload.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.payload;
+
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class TestAWSDmsAvroPayload {
+
+ private static final String AVRO_SCHEMA_STRING = "{\"type\": \"record\","
+ + "\"name\": \"events\"," + "\"fields\": [ "
+ + "{\"name\": \"field1\", \"type\" : \"int\"},"
+ + "{\"name\": \"Op\", \"type\": \"string\"}"
+ + "]}";
+
+ @Test
+ public void testInsert() {
+
+ Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING);
+ GenericRecord record = new GenericData.Record(avroSchema);
+ record.put("field1", 0);
+ record.put("Op", "I");
+
+ AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(record));
+
+ try {
+ Option<IndexedRecord> outputPayload = payload.getInsertValue(avroSchema);
+ assertTrue((int) outputPayload.get().get(0) == 0);
+ assertTrue(outputPayload.get().get(1).toString().equals("I"));
+ } catch (Exception e) {
+ fail("Unexpected exception");
+ }
+
+ }
+
+ @Test
+ public void testUpdate() {
+ Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING);
+ GenericRecord newRecord = new GenericData.Record(avroSchema);
+ newRecord.put("field1", 1);
+ newRecord.put("Op", "U");
+
+ GenericRecord oldRecord = new GenericData.Record(avroSchema);
+ oldRecord.put("field1", 0);
+ oldRecord.put("Op", "I");
+
+ AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(newRecord));
+
+ try {
+ Option<IndexedRecord> outputPayload =
payload.combineAndGetUpdateValue(oldRecord, avroSchema);
+ assertTrue((int) outputPayload.get().get(0) == 1);
+ assertTrue(outputPayload.get().get(1).toString().equals("U"));
+ } catch (Exception e) {
+ fail("Unexpected exception");
+ }
+
+ }
+
+ @Test
+ public void testDelete() {
+ Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING);
+ GenericRecord deleteRecord = new GenericData.Record(avroSchema);
+ deleteRecord.put("field1", 2);
+ deleteRecord.put("Op", "D");
+
+ GenericRecord oldRecord = new GenericData.Record(avroSchema);
+ oldRecord.put("field1", 2);
+ oldRecord.put("Op", "U");
+
+ AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(deleteRecord));
+
+ try {
+ Option<IndexedRecord> outputPayload =
payload.combineAndGetUpdateValue(oldRecord, avroSchema);
+ // expect nothing to be comitted to table
+ assertFalse(outputPayload.isPresent());
+ } catch (Exception e) {
+ fail("Unexpected exception");
+ }
+
+ }
+
+ @Test
+ public void testPreCombineWithDelete() {
+ Schema avroSchema = new Schema.Parser().parse(AVRO_SCHEMA_STRING);
+ GenericRecord deleteRecord = new GenericData.Record(avroSchema);
+ deleteRecord.put("field1", 4);
+ deleteRecord.put("Op", "D");
+
+ GenericRecord oldRecord = new GenericData.Record(avroSchema);
+ oldRecord.put("field1", 4);
+ oldRecord.put("Op", "I");
+
+ AWSDmsAvroPayload payload = new AWSDmsAvroPayload(Option.of(deleteRecord));
+ AWSDmsAvroPayload insertPayload = new
AWSDmsAvroPayload(Option.of(oldRecord));
+
+ try {
+ OverwriteWithLatestAvroPayload output =
payload.preCombine(insertPayload);
+ Option<IndexedRecord> outputPayload = output.getInsertValue(avroSchema);
+ // expect nothing to be comitted to table
+ assertFalse(outputPayload.isPresent());
+ } catch (Exception e) {
+ fail("Unexpected exception");
+ }
+ }
+}