This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 433d7d2 [HUDI-1058] Make delete marker configurable (#1819)
433d7d2 is described below
commit 433d7d2c9886fed161557efe88b62ebdce0fe5df
Author: Shen Hong <[email protected]>
AuthorDate: Mon Aug 3 23:06:31 2020 +0800
[HUDI-1058] Make delete marker configurable (#1819)
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 9 ++
.../model/OverwriteWithLatestAvroPayload.java | 10 ++-
.../model/TestOverwriteWithLatestAvroPayload.java | 67 ++++++++++-----
.../common/testutils/HoodieTestDataGenerator.java | 27 ++++--
.../main/java/org/apache/hudi/DataSourceUtils.java | 23 +++--
.../scala/org/apache/hudi/DataSourceOptions.scala | 7 ++
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 5 +-
.../functional/HoodieSparkSqlWriterSuite.scala | 46 ++++++++++
.../hudi/utilities/deltastreamer/DeltaSync.java | 5 +-
...eltaStreamerWithOverwriteLatestAvroPayload.java | 97 ++++++++++++++++++++++
.../resources/delta-streamer-config/source.avsc | 4 +
.../sql-transformer.properties | 2 +-
.../resources/delta-streamer-config/target.avsc | 4 +
13 files changed, 264 insertions(+), 42 deletions(-)
diff --git
a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 69758f2..61e89e3 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -93,6 +93,9 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public static final String BULKINSERT_SORT_MODE =
"hoodie.bulkinsert.sort.mode";
public static final String DEFAULT_BULKINSERT_SORT_MODE =
BulkInsertSortMode.GLOBAL_SORT
.toString();
+ public static final String DELETE_MARKER_FIELD_PROP =
"hoodie.write.delete.marker.field";
+ public static final String DEFAULT_DELETE_MARKER_FIELD =
"_hoodie_is_deleted";
+
public static final String EMBEDDED_TIMELINE_SERVER_ENABLED =
"hoodie.embed.timeline.server";
public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "true";
@@ -266,6 +269,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig
{
return BulkInsertSortMode.valueOf(sortMode.toUpperCase());
}
+ public String getDeleteMarkerField() {
+ return props.getProperty(DELETE_MARKER_FIELD_PROP);
+ }
+
/**
* compaction properties.
*/
@@ -900,6 +907,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
setDefaultOnCondition(props, !props.containsKey(AVRO_SCHEMA_VALIDATE),
AVRO_SCHEMA_VALIDATE, DEFAULT_AVRO_SCHEMA_VALIDATE);
setDefaultOnCondition(props, !props.containsKey(BULKINSERT_SORT_MODE),
BULKINSERT_SORT_MODE, DEFAULT_BULKINSERT_SORT_MODE);
+ setDefaultOnCondition(props,
!props.containsKey(DELETE_MARKER_FIELD_PROP),
+ DELETE_MARKER_FIELD_PROP, DEFAULT_DELETE_MARKER_FIELD);
// Make sure the props is propagated
setDefaultOnCondition(props, !isIndexConfigSet,
HoodieIndexConfig.newBuilder().fromProperties(props).build());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
index d8dffdf..0e4b18a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
@@ -36,6 +36,8 @@ import java.io.IOException;
public class OverwriteWithLatestAvroPayload extends BaseAvroPayload
implements HoodieRecordPayload<OverwriteWithLatestAvroPayload> {
+ private String deleteMarkerField = "_hoodie_is_deleted";
+
/**
*
*/
@@ -47,6 +49,12 @@ public class OverwriteWithLatestAvroPayload extends
BaseAvroPayload
this(record.isPresent() ? record.get() : null, (record1) -> 0); // natural
order
}
+ public OverwriteWithLatestAvroPayload(GenericRecord record, Comparable
orderingVal,
+ String deleteMarkerField) {
+ this(record, orderingVal);
+ this.deleteMarkerField = deleteMarkerField;
+ }
+
@Override
public OverwriteWithLatestAvroPayload
preCombine(OverwriteWithLatestAvroPayload another) {
// pick the payload with greatest ordering value
@@ -80,7 +88,7 @@ public class OverwriteWithLatestAvroPayload extends
BaseAvroPayload
* @returns {@code true} if record represents a delete record. {@code false}
otherwise.
*/
private boolean isDeleteRecord(GenericRecord genericRecord) {
- Object deleteMarker = genericRecord.get("_hoodie_is_deleted");
+ Object deleteMarker = genericRecord.get(deleteMarkerField);
return (deleteMarker instanceof Boolean && (boolean) deleteMarker);
}
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
index 7c5951a..e212367 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestOverwriteWithLatestAvroPayload.java
@@ -37,6 +37,8 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
public class TestOverwriteWithLatestAvroPayload {
private Schema schema;
+ String defaultDeleteMarkerField = "_hoodie_is_deleted";
+ String deleteMarkerField = "delete_marker_field";
@BeforeEach
public void setUp() throws Exception {
@@ -44,26 +46,56 @@ public class TestOverwriteWithLatestAvroPayload {
new Schema.Field("id", Schema.create(Schema.Type.STRING), "", null),
new Schema.Field("partition", Schema.create(Schema.Type.STRING), "",
null),
new Schema.Field("ts", Schema.create(Schema.Type.LONG), "", null),
- new Schema.Field("_hoodie_is_deleted", Schema.create(Type.BOOLEAN),
"", false)
+ new Schema.Field(defaultDeleteMarkerField,
Schema.create(Type.BOOLEAN), "", false),
+ new Schema.Field(deleteMarkerField, Schema.create(Type.BOOLEAN), "",
false)
));
}
@Test
- public void testActiveRecords() throws IOException {
+ public void testOverwriteWithLatestAvroPayload() throws IOException {
GenericRecord record1 = new GenericData.Record(schema);
record1.put("id", "1");
record1.put("partition", "partition0");
record1.put("ts", 0L);
- record1.put("_hoodie_is_deleted", false);
+ record1.put(defaultDeleteMarkerField, false);
+ record1.put(deleteMarkerField, false);
+ // test1: set default marker field value to true and user defined to false
GenericRecord record2 = new GenericData.Record(schema);
record2.put("id", "2");
record2.put("partition", "partition1");
record2.put("ts", 1L);
- record2.put("_hoodie_is_deleted", false);
+ record2.put(defaultDeleteMarkerField, true);
+ record2.put(deleteMarkerField, false);
+
+ // set to user defined marker field with false, the record should be
considered active.
+ assertActiveRecord(record1, record2, deleteMarkerField);
+
+ // set to default marker field with true, the record should be considered
delete.
+ assertDeletedRecord(record1, record2, defaultDeleteMarkerField);
+
+ // test2: set default marker field value to false and user defined to true
+ GenericRecord record3 = new GenericData.Record(schema);
+ record3.put("id", "2");
+ record3.put("partition", "partition1");
+ record3.put("ts", 1L);
+ record3.put(defaultDeleteMarkerField, false);
+ record3.put(deleteMarkerField, true);
+
+ // set to user defined marker field with true, the record should be
considered delete.
+ assertDeletedRecord(record1, record3, deleteMarkerField);
+
+ // set to default marker field with false, the record should be considered
active.
+ assertActiveRecord(record1, record3, defaultDeleteMarkerField);
+ }
+
+ private void assertActiveRecord(GenericRecord record1,
+ GenericRecord record2, String field) throws
IOException {
+ OverwriteWithLatestAvroPayload payload1 = new
OverwriteWithLatestAvroPayload(
+ record1, 1, field);
+ OverwriteWithLatestAvroPayload payload2 = new
OverwriteWithLatestAvroPayload(
+ record2, 2, field);
- OverwriteWithLatestAvroPayload payload1 = new
OverwriteWithLatestAvroPayload(record1, 1);
- OverwriteWithLatestAvroPayload payload2 = new
OverwriteWithLatestAvroPayload(record2, 2);
assertEquals(payload1.preCombine(payload2), payload2);
assertEquals(payload2.preCombine(payload1), payload2);
@@ -74,22 +106,12 @@ public class TestOverwriteWithLatestAvroPayload {
assertEquals(payload2.combineAndGetUpdateValue(record1, schema).get(),
record2);
}
- @Test
- public void testDeletedRecord() throws IOException {
- GenericRecord record1 = new GenericData.Record(schema);
- record1.put("id", "1");
- record1.put("partition", "partition0");
- record1.put("ts", 0L);
- record1.put("_hoodie_is_deleted", false);
-
- GenericRecord delRecord1 = new GenericData.Record(schema);
- delRecord1.put("id", "2");
- delRecord1.put("partition", "partition1");
- delRecord1.put("ts", 1L);
- delRecord1.put("_hoodie_is_deleted", true);
-
- OverwriteWithLatestAvroPayload payload1 = new
OverwriteWithLatestAvroPayload(record1, 1);
- OverwriteWithLatestAvroPayload payload2 = new
OverwriteWithLatestAvroPayload(delRecord1, 2);
+ private void assertDeletedRecord(GenericRecord record1,
+ GenericRecord delRecord1, String field)
throws IOException {
+ OverwriteWithLatestAvroPayload payload1 = new
OverwriteWithLatestAvroPayload(
+ record1, 1, field);
+ OverwriteWithLatestAvroPayload payload2 = new
OverwriteWithLatestAvroPayload(
+ delRecord1, 2, field);
assertEquals(payload1.preCombine(payload2), payload2);
assertEquals(payload2.preCombine(payload1), payload2);
@@ -99,5 +121,4 @@ public class TestOverwriteWithLatestAvroPayload {
assertEquals(payload1.combineAndGetUpdateValue(delRecord1, schema).get(),
record1);
assertFalse(payload2.combineAndGetUpdateValue(record1,
schema).isPresent());
}
-
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index ed7a458..841a74a 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -105,6 +105,7 @@ public class HoodieTestDataGenerator {
+ "{\"name\": \"seconds_since_epoch\", \"type\": \"long\"},"
+ "{\"name\": \"weight\", \"type\": \"float\"},"
+ "{\"name\": \"nation\", \"type\": \"bytes\"},"
+ + "{\"name\": \"user_defined_delete_marker_field\", \"type\":
\"boolean\", \"default\": false},"
+ "{\"name\":\"current_date\",\"type\": {\"type\": \"int\",
\"logicalType\": \"date\"}},"
+ "{\"name\":\"current_ts\",\"type\": {\"type\": \"long\",
\"logicalType\": \"timestamp-micros\"}},"
+
"{\"name\":\"height\",\"type\":{\"type\":\"fixed\",\"name\":\"abc\",\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":6}},";
@@ -122,7 +123,7 @@ public class HoodieTestDataGenerator {
+
"{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\":
\"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}";
public static final String NULL_SCHEMA =
Schema.create(Schema.Type.NULL).toString();
- public static final String TRIP_HIVE_COLUMN_TYPES =
"double,string,string,string,double,double,double,double,int,bigint,float,binary,int,bigint,decimal(10,6),"
+ public static final String TRIP_HIVE_COLUMN_TYPES =
"double,string,string,string,double,double,double,double,int,bigint,float,binary,boolean,int,bigint,decimal(10,6),"
+
"map<string,string>,struct<amount:double,currency:string>,array<struct<amount:double,currency:string>>,boolean";
public static final Schema AVRO_SCHEMA = new
Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
@@ -177,6 +178,18 @@ public class HoodieTestDataGenerator {
return null;
}
+ public static List<GenericRecord> generateGenericRecords(int n, boolean
isDeleteRecord, int instantTime) {
+ return IntStream.range(0, n).boxed().map(i -> {
+ String partitionPath = DEFAULT_FIRST_PARTITION_PATH;
+ HoodieKey key = new HoodieKey("id_" + i, partitionPath);
+ HoodieTestDataGenerator.KeyPartition kp = new
HoodieTestDataGenerator.KeyPartition();
+ kp.key = key;
+ kp.partitionPath = partitionPath;
+ return HoodieTestDataGenerator.generateGenericRecord(
+ key.getRecordKey(), "rider-" + instantTime, "driver-" +
instantTime, instantTime, isDeleteRecord, false);
+ }).collect(Collectors.toList());
+ }
+
/**
* Generates a new avro record of the above nested schema format,
* retaining the key if optionally provided.
@@ -263,11 +276,11 @@ public class HoodieTestDataGenerator {
rec.put("weight", RAND.nextFloat());
byte[] bytes = "Canada".getBytes();
rec.put("nation", ByteBuffer.wrap(bytes));
+ rec.put("user_defined_delete_marker_field", isDeleteRecord);
long currentTimeMillis = System.currentTimeMillis();
Date date = new Date(currentTimeMillis);
rec.put("current_date", (int) date.toLocalDate().toEpochDay());
rec.put("current_ts", currentTimeMillis);
-
BigDecimal bigDecimal = new BigDecimal(String.format("%5f",
RAND.nextFloat()));
Schema decimalSchema = AVRO_SCHEMA.getField("height").schema();
Conversions.DecimalConversion decimalConversions = new
Conversions.DecimalConversion();
@@ -290,11 +303,7 @@ public class HoodieTestDataGenerator {
rec.put("tip_history", tipHistoryArray);
}
- if (isDeleteRecord) {
- rec.put("_hoodie_is_deleted", true);
- } else {
- rec.put("_hoodie_is_deleted", false);
- }
+ rec.put("_hoodie_is_deleted", isDeleteRecord);
return rec;
}
@@ -761,8 +770,8 @@ public class HoodieTestDataGenerator {
public static class KeyPartition implements Serializable {
- HoodieKey key;
- String partitionPath;
+ public HoodieKey key;
+ public String partitionPath;
}
public void close() {
diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
index a4e7472..5740f4b 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
@@ -215,11 +216,20 @@ public class DataSourceUtils {
/**
* Create a payload class via reflection, passing in an ordering/precombine
value.
*/
- public static HoodieRecordPayload createPayload(String payloadClass,
GenericRecord record, Comparable orderingVal)
- throws IOException {
+ public static HoodieRecordPayload createPayload(String payloadClass,
GenericRecord record,
+ Comparable orderingVal,
+ String deleteMarkerField)
throws IOException {
try {
- return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
- new Class<?>[] {GenericRecord.class, Comparable.class}, record,
orderingVal);
+ HoodieRecordPayload payload = null;
+ if (payloadClass.equals(OverwriteWithLatestAvroPayload.class.getName()))
{
+ payload = (OverwriteWithLatestAvroPayload)
ReflectionUtils.loadClass(payloadClass,
+ new Class<?>[]{GenericRecord.class, Comparable.class,
String.class},
+ record, orderingVal, deleteMarkerField);
+ } else {
+ payload = (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
+ new Class<?>[]{GenericRecord.class, Comparable.class}, record,
orderingVal);
+ }
+ return payload;
} catch (Throwable e) {
throw new IOException("Could not create payload for class: " +
payloadClass, e);
}
@@ -275,8 +285,9 @@ public class DataSourceUtils {
}
public static HoodieRecord createHoodieRecord(GenericRecord gr, Comparable
orderingVal, HoodieKey hKey,
- String payloadClass) throws
IOException {
- HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass,
gr, orderingVal);
+ String payloadClass,
+ String deleteMarkerField)
throws IOException {
+ HoodieRecordPayload payload = DataSourceUtils.createPayload(payloadClass,
gr, orderingVal, deleteMarkerField);
return new HoodieRecord<>(hKey, payload);
}
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 5195f05..bb26302 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -185,6 +185,13 @@ object DataSourceWriteOptions {
val DEFAULT_PAYLOAD_OPT_VAL = classOf[OverwriteWithLatestAvroPayload].getName
/**
+ * Field used in OverwriteWithLatestAvroPayload combineAndGetUpdateValue,
When two records have the same
+ * key value, we will check if the new record is deleted by the delete field.
+ */
+ val DELETE_FIELD_OPT_KEY = "hoodie.datasource.write.delete.field"
+ val DEFAULT_DELETE_FIELD_OPT_VAL = "_hoodie_is_deleted"
+
+ /**
* Record key field. Value to be used as the `recordKey` component of
`HoodieKey`. Actual value
* will be obtained by invoking .toString() on the field value. Nested
fields can be specified using
* the dot notation eg: `a.b.c`
diff --git
a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 3ba34cb..f8f388e 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -110,7 +110,9 @@ private[hudi] object HoodieSparkSqlWriter {
val orderingVal = DataSourceUtils.getNestedFieldVal(gr,
parameters(PRECOMBINE_FIELD_OPT_KEY), false)
.asInstanceOf[Comparable[_]]
DataSourceUtils.createHoodieRecord(gr,
- orderingVal, keyGenerator.getKey(gr),
parameters(PAYLOAD_CLASS_OPT_KEY))
+ orderingVal, keyGenerator.getKey(gr),
+ parameters(PAYLOAD_CLASS_OPT_KEY),
+ parameters(DELETE_FIELD_OPT_KEY))
}).toJavaRDD()
// Handle various save modes
@@ -202,6 +204,7 @@ private[hudi] object HoodieSparkSqlWriter {
TABLE_TYPE_OPT_KEY -> DEFAULT_TABLE_TYPE_OPT_VAL,
PRECOMBINE_FIELD_OPT_KEY -> DEFAULT_PRECOMBINE_FIELD_OPT_VAL,
PAYLOAD_CLASS_OPT_KEY -> DEFAULT_PAYLOAD_OPT_VAL,
+ DELETE_FIELD_OPT_KEY -> DEFAULT_DELETE_FIELD_OPT_VAL,
RECORDKEY_FIELD_OPT_KEY -> DEFAULT_RECORDKEY_FIELD_OPT_VAL,
PARTITIONPATH_FIELD_OPT_KEY -> DEFAULT_PARTITIONPATH_FIELD_OPT_VAL,
KEYGENERATOR_CLASS_OPT_KEY -> DEFAULT_KEYGENERATOR_CLASS_OPT_VAL,
diff --git
a/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
b/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
index 7f26481..29dac2b 100644
---
a/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
+++
b/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
@@ -100,6 +100,52 @@ class HoodieSparkSqlWriterSuite extends FunSuite with
Matchers {
}
}
+ test("test OverwriteWithLatestAvroPayload with user defined delete field") {
+ val session = SparkSession.builder()
+ .appName("test_append_mode")
+ .master("local[2]")
+ .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .getOrCreate()
+ val path = java.nio.file.Files.createTempDirectory("hoodie_test_path1")
+
+ try {
+ val sqlContext = session.sqlContext
+ val hoodieFooTableName = "hoodie_foo_tbl"
+
+ val keyField = "id"
+ val deleteMarkerField = "delete_field"
+
+ //create a new table
+ val fooTableModifier = Map("path" -> path.toAbsolutePath.toString,
+ HoodieWriteConfig.TABLE_NAME -> hoodieFooTableName,
+ "hoodie.insert.shuffle.parallelism" -> "2",
+ "hoodie.upsert.shuffle.parallelism" -> "2",
+ DELETE_FIELD_OPT_KEY -> deleteMarkerField,
+ RECORDKEY_FIELD_OPT_KEY -> keyField)
+ val fooTableParams =
HoodieSparkSqlWriter.parametersWithWriteDefaults(fooTableModifier)
+
+ val id1 = UUID.randomUUID().toString
+ val dataFrame = session.createDataFrame(Seq(
+ (id1, 1, false)
+ )) toDF(keyField, "ts", deleteMarkerField)
+
+ HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams,
dataFrame)
+ val recordCount1 =
sqlContext.read.format("org.apache.hudi").load(path.toString +
"/*/*.parquet").count
+ assert(recordCount1 == 1, "result should be 1, but get " + recordCount1)
+
+ val dataFrame2 = session.createDataFrame(Seq(
+ (id1, 2, true)
+ )) toDF(keyField, "ts", deleteMarkerField)
+ HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableParams,
dataFrame2)
+
+ val recordCount2 =
sqlContext.read.format("org.apache.hudi").load(path.toString +
"/*/*.parquet").count()
+ assert(recordCount2 == 0, "result should be 0, but get " + recordCount2)
+ } finally {
+ session.stop()
+ FileUtils.deleteDirectory(path.toFile)
+ }
+ }
+
case class Test(uuid: String, ts: Long)
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index e7226fb..0db486e 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -338,9 +338,12 @@ public class DeltaSync implements Serializable {
}
JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
+ String deleteMakrerField =
props.getString(HoodieWriteConfig.DELETE_MARKER_FIELD_PROP,
+ HoodieWriteConfig.DEFAULT_DELETE_MARKER_FIELD);
JavaRDD<HoodieRecord> records = avroRDD.map(gr -> {
HoodieRecordPayload payload =
DataSourceUtils.createPayload(cfg.payloadClassName, gr,
- (Comparable) DataSourceUtils.getNestedFieldVal(gr,
cfg.sourceOrderingField, false));
+ (Comparable) DataSourceUtils.getNestedFieldVal(gr,
cfg.sourceOrderingField, false),
+ deleteMakrerField);
return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
});
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestDeltaStreamerWithOverwriteLatestAvroPayload.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestDeltaStreamerWithOverwriteLatestAvroPayload.java
new file mode 100644
index 0000000..f98eb79
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestDeltaStreamerWithOverwriteLatestAvroPayload.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.functional;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
+import org.apache.hudi.utilities.sources.ParquetDFSSource;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestDeltaStreamerWithOverwriteLatestAvroPayload extends
UtilitiesTestBase {
+ private static String PARQUET_SOURCE_ROOT;
+ private static final String PROPS_FILENAME_TEST_PARQUET =
"test-parquet-dfs-source.properties";
+
+ @BeforeAll
+ public static void initClass() throws Exception {
+ UtilitiesTestBase.initClass(true);
+ PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
+
+ // prepare the configs.
+
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties",
dfs, dfsBasePath + "/base.properties");
+
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties",
dfs,
+ dfsBasePath + "/sql-transformer.properties");
+ UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc",
dfs, dfsBasePath + "/source.avsc");
+
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc",
dfs, dfsBasePath + "/source-flattened.avsc");
+ UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc",
dfs, dfsBasePath + "/target.avsc");
+ }
+
+ @Test
+ public void testOverwriteLatestAvroPayload() throws Exception {
+ // test defaultDeleteMarkerField
+ this.testOverwriteLatestAvroPayload(null);
+
+ // test userDefinedDeleteMarkerField
+ this.testOverwriteLatestAvroPayload("user_defined_delete_marker_field");
+ }
+
+ private void testOverwriteLatestAvroPayload(String deleteMarkerField) throws
Exception {
+ String path = PARQUET_SOURCE_ROOT + "/1.parquet";
+ List<GenericRecord> records =
HoodieTestDataGenerator.generateGenericRecords(5, false, 0);
+ Helpers.saveParquetToDFS(records, new Path(path));
+
+ TypedProperties parquetProps = new TypedProperties();
+ parquetProps.setProperty("include", "base.properties");
+ parquetProps.setProperty("hoodie.datasource.write.recordkey.field",
"_row_key");
+ parquetProps.setProperty("hoodie.datasource.write.partitionpath.field",
"not_there");
+ parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root",
PARQUET_SOURCE_ROOT);
+ if (deleteMarkerField != null) {
+ parquetProps.setProperty(HoodieWriteConfig.DELETE_MARKER_FIELD_PROP,
deleteMarkerField);
+ }
+ Helpers.savePropsToDFS(parquetProps, dfs, dfsBasePath + "/" +
PROPS_FILENAME_TEST_PARQUET);
+
+ String tableBasePath = dfsBasePath +
"/test_overwrite_lastest_avro_payload_table";
+
+ HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
+ TestHoodieDeltaStreamer.TestHelpers.makeConfig(tableBasePath,
HoodieDeltaStreamer.Operation.INSERT, ParquetDFSSource.class.getName(),
+ null, PROPS_FILENAME_TEST_PARQUET, false,
+ false, 100000, false, null, null, "timestamp"), jsc);
+ deltaStreamer.sync();
+ TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, tableBasePath +
"/*/*.parquet", sqlContext);
+
+ String path2 = PARQUET_SOURCE_ROOT + "/2.parquet";
+ List<GenericRecord> records2 =
HoodieTestDataGenerator.generateGenericRecords(4, true, 1);
+ Helpers.saveParquetToDFS(records2, new Path(path2));
+ deltaStreamer.sync();
+
+ List<Row> rows =
sqlContext.read().format("org.apache.hudi").load(tableBasePath +
"/*/*.parquet").collectAsList();
+ assertEquals(1, rows.size());
+ assertEquals(records.get(4).get("_row_key"), rows.get(0).getString(2));
+ }
+}
diff --git
a/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc
b/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc
index f5cc97f..bda782e 100644
--- a/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/source.avsc
@@ -56,6 +56,10 @@
"name" : "nation",
"type" : "bytes"
},{
+ "name" : "user_defined_delete_marker_field",
+ "type" : "boolean",
+ "default" : false
+ },{
"name" : "current_date",
"type" : {
"type" : "int",
diff --git
a/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties
b/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties
index dc735e8..5eaa0a7 100644
---
a/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties
+++
b/hudi-utilities/src/test/resources/delta-streamer-config/sql-transformer.properties
@@ -16,4 +16,4 @@
# limitations under the License.
###
include=base.properties
-hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider,
a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.distance_in_meters,
a.seconds_since_epoch, a.weight, a.nation, a.current_date, a.current_ts,
a.height, a.city_to_state, a.fare, a.tip_history, a.`_hoodie_is_deleted`,
CAST(1.0 AS DOUBLE) AS haversine_distance FROM <SRC> a
+hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key, a.rider,
a.driver, a.begin_lat, a.begin_lon, a.end_lat, a.end_lon, a.distance_in_meters,
a.seconds_since_epoch, a.weight, a.nation, a.user_defined_delete_marker_field,
a.current_date, a.current_ts, a.height, a.city_to_state, a.fare, a.tip_history,
a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS haversine_distance FROM <SRC> a
diff --git
a/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc
b/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc
index a026107..a17e3dd 100644
--- a/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/target.avsc
@@ -56,6 +56,10 @@
"name" : "nation",
"type" : "bytes"
},{
+ "name" : "user_defined_delete_marker_field",
+ "type" : "boolean",
+ "default" : false
+ },{
"name" : "current_date",
"type" : {
"type" : "int",