This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new dfac12385bd [HUDI-5802] Allow configuration for deletes in
DefaultHoodieRecordPayload (#7961)
dfac12385bd is described below
commit dfac12385bdb2b8363e53e18d1b477433cf944eb
Author: Lokesh Jain <[email protected]>
AuthorDate: Mon Feb 20 15:05:08 2023 +0530
[HUDI-5802] Allow configuration for deletes in DefaultHoodieRecordPayload
(#7961)
Modify DefaultHoodieRecordPayload to be able to handle a configured delete
key and marker.
---
.../common/model/DefaultHoodieRecordPayload.java | 36 ++++++++++--
.../apache/hudi/common/util/ValidationUtils.java | 9 +++
.../model/TestDefaultHoodieRecordPayload.java | 65 ++++++++++++++++++++++
.../hudi/command/payload/ExpressionPayload.scala | 4 +-
4 files changed, 107 insertions(+), 7 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
index a218e9dc33d..76cdea48ef5 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
@@ -20,6 +20,8 @@ package org.apache.hudi.common.model;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.avro.Schema;
@@ -37,8 +39,9 @@ import java.util.Properties;
* 1. preCombine - Picks the latest delta record for a key, based on an
ordering field 2. combineAndGetUpdateValue/getInsertValue - Chooses the latest
record based on ordering field value.
*/
public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload
{
-
public static final String METADATA_EVENT_TIME_KEY =
"metadata.event_time.key";
+ public static final String DELETE_KEY = "hoodie.payload.delete.field";
+ public static final String DELETE_MARKER = "hoodie.payload.delete.marker";
private Option<Object> eventTime = Option.empty();
public DefaultHoodieRecordPayload(GenericRecord record, Comparable
orderingVal) {
@@ -51,7 +54,7 @@ public class DefaultHoodieRecordPayload extends
OverwriteWithLatestAvroPayload {
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
currentValue, Schema schema, Properties properties) throws IOException {
- if (recordBytes.length == 0 || isDeletedRecord) {
+ if (recordBytes.length == 0) {
return Option.empty();
}
@@ -71,18 +74,41 @@ public class DefaultHoodieRecordPayload extends
OverwriteWithLatestAvroPayload {
/*
* Now check if the incoming record is a delete record.
*/
- return Option.of(incomingRecord);
+ return isDeleteRecord(incomingRecord, properties) ? Option.empty() :
Option.of(incomingRecord);
}
@Override
public Option<IndexedRecord> getInsertValue(Schema schema, Properties
properties) throws IOException {
- if (recordBytes.length == 0 || isDeletedRecord) {
+ if (recordBytes.length == 0) {
return Option.empty();
}
GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes,
schema);
eventTime = updateEventTime(incomingRecord, properties);
- return Option.of(incomingRecord);
+ return isDeleteRecord(incomingRecord, properties) ? Option.empty() :
Option.of(incomingRecord);
+ }
+
+ /**
+ * @param genericRecord instance of {@link GenericRecord} of interest.
+ * @param properties payload related properties
+ * @returns {@code true} if record represents a delete record. {@code false}
otherwise.
+ */
+ protected boolean isDeleteRecord(GenericRecord genericRecord, Properties
properties) {
+ final String deleteKey = properties.getProperty(DELETE_KEY);
+ if (StringUtils.isNullOrEmpty(deleteKey)) {
+ return isDeleteRecord(genericRecord);
+ }
+
+
ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(properties.getProperty(DELETE_MARKER)),
+ () -> DELETE_MARKER + " should be configured with " + DELETE_KEY);
+ // Modify to be compatible with new version Avro.
+ // The new version Avro throws for GenericRecord.get if the field name
+ // does not exist in the schema.
+ if (genericRecord.getSchema().getField(deleteKey) == null) {
+ return false;
+ }
+ Object deleteMarker = genericRecord.get(deleteKey);
+ return deleteMarker != null &&
properties.getProperty(DELETE_MARKER).equals(deleteMarker.toString());
}
private static Option<Object> updateEventTime(GenericRecord record,
Properties properties) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/ValidationUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/ValidationUtils.java
index ad8c53c8213..3350c9a8608 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ValidationUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ValidationUtils.java
@@ -18,6 +18,8 @@
package org.apache.hudi.common.util;
+import java.util.function.Supplier;
+
/**
* Simple utility to test validation conditions (to replace Guava's
PreConditions)
*/
@@ -41,6 +43,13 @@ public class ValidationUtils {
}
}
+ /**
+ * Ensures the truth of an expression, throwing the custom errorMessage
otherwise.
+ */
+ public static void checkArgument(final boolean expression, final
Supplier<String> errorMessageSupplier) {
+ checkArgument(expression, errorMessageSupplier.get());
+ }
+
/**
* Ensures the truth of an expression involving the state of the calling
instance, but not
* involving any parameters to the calling method.
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java
index c0896e723ea..6bc0783e652 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java
@@ -33,9 +33,12 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
/**
* Unit tests {@link DefaultHoodieRecordPayload}.
@@ -110,6 +113,68 @@ public class TestDefaultHoodieRecordPayload {
assertFalse(payload2.combineAndGetUpdateValue(record1, schema,
props).isPresent());
}
+ @Test
+ public void testDeleteKey() throws IOException {
+ props.setProperty(DELETE_KEY, "ts");
+ props.setProperty(DELETE_MARKER, String.valueOf(1L));
+ GenericRecord record = new GenericData.Record(schema);
+ record.put("id", "1");
+ record.put("partition", "partition0");
+ record.put("ts", 0L);
+ record.put("_hoodie_is_deleted", false);
+
+ GenericRecord delRecord = new GenericData.Record(schema);
+ delRecord.put("id", "2");
+ delRecord.put("partition", "partition1");
+ delRecord.put("ts", 1L);
+ delRecord.put("_hoodie_is_deleted", false);
+
+ GenericRecord defaultDeleteRecord = new GenericData.Record(schema);
+ defaultDeleteRecord.put("id", "2");
+ defaultDeleteRecord.put("partition", "partition1");
+ defaultDeleteRecord.put("ts", 2L);
+ defaultDeleteRecord.put("_hoodie_is_deleted", true);
+
+ DefaultHoodieRecordPayload payload = new
DefaultHoodieRecordPayload(record, 1);
+ DefaultHoodieRecordPayload deletePayload = new
DefaultHoodieRecordPayload(delRecord, 2);
+ DefaultHoodieRecordPayload defaultDeletePayload = new
DefaultHoodieRecordPayload(defaultDeleteRecord, 2);
+
+ assertEquals(record, payload.getInsertValue(schema, props).get());
+ assertEquals(defaultDeleteRecord,
defaultDeletePayload.getInsertValue(schema, props).get());
+ assertFalse(deletePayload.getInsertValue(schema, props).isPresent());
+
+ assertEquals(delRecord, payload.combineAndGetUpdateValue(delRecord,
schema, props).get());
+ assertEquals(defaultDeleteRecord,
payload.combineAndGetUpdateValue(defaultDeleteRecord, schema, props).get());
+ assertFalse(deletePayload.combineAndGetUpdateValue(record, schema,
props).isPresent());
+ }
+
+ @Test
+ public void testDeleteKeyConfiguration() throws IOException {
+ props.setProperty(DELETE_KEY, "ts");
+ GenericRecord record = new GenericData.Record(schema);
+ record.put("id", "1");
+ record.put("partition", "partition0");
+ record.put("ts", 0L);
+ record.put("_hoodie_is_deleted", false);
+
+ DefaultHoodieRecordPayload payload = new
DefaultHoodieRecordPayload(record, 1);
+
+ // Verify failure when DELETE_MARKER is not configured along with
DELETE_KEY
+ try {
+ payload.getInsertValue(schema, props).get();
+ fail("Should fail");
+ } catch (IllegalArgumentException e) {
+ // Ignore
+ }
+
+ try {
+ payload.combineAndGetUpdateValue(record, schema, props).get();
+ fail("Should fail");
+ } catch (IllegalArgumentException e) {
+ // Ignore
+ }
+ }
+
@Test
public void testGetEmptyMetadata() {
GenericRecord record = new GenericData.Record(schema);
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
index 015ba2e2e8e..39a065f1df5 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
@@ -26,7 +26,7 @@ import org.apache.hudi.SparkAdapterSupport.sparkAdapter
import org.apache.hudi.avro.AvroSchemaUtils.isNullable
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro
-import org.apache.hudi.common.model.BaseAvroPayload.isDeleteRecord
+import org.apache.hudi.common.model.BaseAvroPayload
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload,
HoodiePayloadProps, HoodieRecord}
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.{BinaryUtil, ValidationUtils, Option =>
HOption}
@@ -238,7 +238,7 @@ class ExpressionPayload(@transient record: GenericRecord,
val recordSchema = getRecordSchema(properties)
val incomingRecord = ConvertibleRecord(bytesToAvro(recordBytes,
recordSchema))
- if (isDeleteRecord(incomingRecord.asAvro)) {
+ if (BaseAvroPayload.isDeleteRecord(incomingRecord.asAvro)) {
HOption.empty[IndexedRecord]()
} else if (isMORTable(properties)) {
// For the MOR table, both the matched and not-matched record will step
into the getInsertValue() method.