This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new dfac12385bd [HUDI-5802] Allow configuration for deletes in 
DefaultHoodieRecordPayload (#7961)
dfac12385bd is described below

commit dfac12385bdb2b8363e53e18d1b477433cf944eb
Author: Lokesh Jain <[email protected]>
AuthorDate: Mon Feb 20 15:05:08 2023 +0530

    [HUDI-5802] Allow configuration for deletes in DefaultHoodieRecordPayload 
(#7961)
    
    Modify DefaultHoodieRecordPayload to be able to handle a configured delete 
key and marker.
---
 .../common/model/DefaultHoodieRecordPayload.java   | 36 ++++++++++--
 .../apache/hudi/common/util/ValidationUtils.java   |  9 +++
 .../model/TestDefaultHoodieRecordPayload.java      | 65 ++++++++++++++++++++++
 .../hudi/command/payload/ExpressionPayload.scala   |  4 +-
 4 files changed, 107 insertions(+), 7 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
index a218e9dc33d..76cdea48ef5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
@@ -20,6 +20,8 @@ package org.apache.hudi.common.model;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 
 import org.apache.avro.Schema;
@@ -37,8 +39,9 @@ import java.util.Properties;
  * 1. preCombine - Picks the latest delta record for a key, based on an 
ordering field 2. combineAndGetUpdateValue/getInsertValue - Chooses the latest 
record based on ordering field value.
  */
 public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload 
{
-
   public static final String METADATA_EVENT_TIME_KEY = 
"metadata.event_time.key";
+  public static final String DELETE_KEY = "hoodie.payload.delete.field";
+  public static final String DELETE_MARKER = "hoodie.payload.delete.marker";
   private Option<Object> eventTime = Option.empty();
 
   public DefaultHoodieRecordPayload(GenericRecord record, Comparable 
orderingVal) {
@@ -51,7 +54,7 @@ public class DefaultHoodieRecordPayload extends 
OverwriteWithLatestAvroPayload {
 
   @Override
   public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
currentValue, Schema schema, Properties properties) throws IOException {
-    if (recordBytes.length == 0 || isDeletedRecord) {
+    if (recordBytes.length == 0) {
       return Option.empty();
     }
 
@@ -71,18 +74,41 @@ public class DefaultHoodieRecordPayload extends 
OverwriteWithLatestAvroPayload {
     /*
      * Now check if the incoming record is a delete record.
      */
-    return Option.of(incomingRecord);
+    return isDeleteRecord(incomingRecord, properties) ? Option.empty() : 
Option.of(incomingRecord);
   }
 
   @Override
   public Option<IndexedRecord> getInsertValue(Schema schema, Properties 
properties) throws IOException {
-    if (recordBytes.length == 0 || isDeletedRecord) {
+    if (recordBytes.length == 0) {
       return Option.empty();
     }
     GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, 
schema);
     eventTime = updateEventTime(incomingRecord, properties);
 
-    return Option.of(incomingRecord);
+    return isDeleteRecord(incomingRecord, properties) ? Option.empty() : 
Option.of(incomingRecord);
+  }
+
+  /**
+   * @param genericRecord instance of {@link GenericRecord} of interest.
+   * @param properties payload related properties
+   * @returns {@code true} if record represents a delete record. {@code false} 
otherwise.
+   */
+  protected boolean isDeleteRecord(GenericRecord genericRecord, Properties 
properties) {
+    final String deleteKey = properties.getProperty(DELETE_KEY);
+    if (StringUtils.isNullOrEmpty(deleteKey)) {
+      return isDeleteRecord(genericRecord);
+    }
+
+    
ValidationUtils.checkArgument(!StringUtils.isNullOrEmpty(properties.getProperty(DELETE_MARKER)),
+        () -> DELETE_MARKER + " should be configured with " + DELETE_KEY);
+    // Modify to be compatible with new version Avro.
+    // The new version Avro throws for GenericRecord.get if the field name
+    // does not exist in the schema.
+    if (genericRecord.getSchema().getField(deleteKey) == null) {
+      return false;
+    }
+    Object deleteMarker = genericRecord.get(deleteKey);
+    return deleteMarker != null && 
properties.getProperty(DELETE_MARKER).equals(deleteMarker.toString());
   }
 
   private static Option<Object> updateEventTime(GenericRecord record, 
Properties properties) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ValidationUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ValidationUtils.java
index ad8c53c8213..3350c9a8608 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ValidationUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ValidationUtils.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi.common.util;
 
+import java.util.function.Supplier;
+
 /**
  * Simple utility to test validation conditions (to replace Guava's 
PreConditions)
  */
@@ -41,6 +43,13 @@ public class ValidationUtils {
     }
   }
 
+  /**
+   * Ensures the truth of an expression, throwing the custom errorMessage 
otherwise.
+   */
+  public static void checkArgument(final boolean expression, final 
Supplier<String> errorMessageSupplier) {
+    checkArgument(expression, errorMessageSupplier.get());
+  }
+
   /**
    * Ensures the truth of an expression involving the state of the calling 
instance, but not
    * involving any parameters to the calling method.
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java
index c0896e723ea..6bc0783e652 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestDefaultHoodieRecordPayload.java
@@ -33,9 +33,12 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Properties;
 
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
+import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_MARKER;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
 /**
  * Unit tests {@link DefaultHoodieRecordPayload}.
@@ -110,6 +113,68 @@ public class TestDefaultHoodieRecordPayload {
     assertFalse(payload2.combineAndGetUpdateValue(record1, schema, 
props).isPresent());
   }
 
+  @Test
+  public void testDeleteKey() throws IOException {
+    props.setProperty(DELETE_KEY, "ts");
+    props.setProperty(DELETE_MARKER, String.valueOf(1L));
+    GenericRecord record = new GenericData.Record(schema);
+    record.put("id", "1");
+    record.put("partition", "partition0");
+    record.put("ts", 0L);
+    record.put("_hoodie_is_deleted", false);
+
+    GenericRecord delRecord = new GenericData.Record(schema);
+    delRecord.put("id", "2");
+    delRecord.put("partition", "partition1");
+    delRecord.put("ts", 1L);
+    delRecord.put("_hoodie_is_deleted", false);
+
+    GenericRecord defaultDeleteRecord = new GenericData.Record(schema);
+    defaultDeleteRecord.put("id", "2");
+    defaultDeleteRecord.put("partition", "partition1");
+    defaultDeleteRecord.put("ts", 2L);
+    defaultDeleteRecord.put("_hoodie_is_deleted", true);
+
+    DefaultHoodieRecordPayload payload = new 
DefaultHoodieRecordPayload(record, 1);
+    DefaultHoodieRecordPayload deletePayload = new 
DefaultHoodieRecordPayload(delRecord, 2);
+    DefaultHoodieRecordPayload defaultDeletePayload = new 
DefaultHoodieRecordPayload(defaultDeleteRecord, 2);
+
+    assertEquals(record, payload.getInsertValue(schema, props).get());
+    assertEquals(defaultDeleteRecord, 
defaultDeletePayload.getInsertValue(schema, props).get());
+    assertFalse(deletePayload.getInsertValue(schema, props).isPresent());
+
+    assertEquals(delRecord, payload.combineAndGetUpdateValue(delRecord, 
schema, props).get());
+    assertEquals(defaultDeleteRecord, 
payload.combineAndGetUpdateValue(defaultDeleteRecord, schema, props).get());
+    assertFalse(deletePayload.combineAndGetUpdateValue(record, schema, 
props).isPresent());
+  }
+
+  @Test
+  public void testDeleteKeyConfiguration() throws IOException {
+    props.setProperty(DELETE_KEY, "ts");
+    GenericRecord record = new GenericData.Record(schema);
+    record.put("id", "1");
+    record.put("partition", "partition0");
+    record.put("ts", 0L);
+    record.put("_hoodie_is_deleted", false);
+
+    DefaultHoodieRecordPayload payload = new 
DefaultHoodieRecordPayload(record, 1);
+
+    // Verify failure when DELETE_MARKER is not configured along with 
DELETE_KEY
+    try {
+      payload.getInsertValue(schema, props).get();
+      fail("Should fail");
+    } catch (IllegalArgumentException e) {
+      // Ignore
+    }
+
+    try {
+      payload.combineAndGetUpdateValue(record, schema, props).get();
+      fail("Should fail");
+    } catch (IllegalArgumentException e) {
+      // Ignore
+    }
+  }
+
   @Test
   public void testGetEmptyMetadata() {
     GenericRecord record = new GenericData.Record(schema);
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
index 015ba2e2e8e..39a065f1df5 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
@@ -26,7 +26,7 @@ import org.apache.hudi.SparkAdapterSupport.sparkAdapter
 import org.apache.hudi.avro.AvroSchemaUtils.isNullable
 import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro
-import org.apache.hudi.common.model.BaseAvroPayload.isDeleteRecord
+import org.apache.hudi.common.model.BaseAvroPayload
 import org.apache.hudi.common.model.{DefaultHoodieRecordPayload, 
HoodiePayloadProps, HoodieRecord}
 import org.apache.hudi.common.util.ValidationUtils.checkState
 import org.apache.hudi.common.util.{BinaryUtil, ValidationUtils, Option => 
HOption}
@@ -238,7 +238,7 @@ class ExpressionPayload(@transient record: GenericRecord,
     val recordSchema = getRecordSchema(properties)
     val incomingRecord = ConvertibleRecord(bytesToAvro(recordBytes, 
recordSchema))
 
-    if (isDeleteRecord(incomingRecord.asAvro)) {
+    if (BaseAvroPayload.isDeleteRecord(incomingRecord.asAvro)) {
       HOption.empty[IndexedRecord]()
     } else if (isMORTable(properties)) {
       // For the MOR table, both the matched and not-matched record will step 
into the getInsertValue() method.

Reply via email to