codope commented on code in PR #7961:
URL: https://github.com/apache/hudi/pull/7961#discussion_r1108758019
##########
hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java:
##########
@@ -37,8 +37,10 @@
* 1. preCombine - Picks the latest delta record for a key, based on an
ordering field 2. combineAndGetUpdateValue/getInsertValue - Chooses the latest
record based on ordering field value.
*/
public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload
{
-
+ private static final String DEFAULT_DELETE_MARKER = "false";
public static final String METADATA_EVENT_TIME_KEY =
"metadata.event_time.key";
+ public static final String DELETE_KEY = "hoodie.payload.delete.field";
+ public static final String DELETE_MARKER = "hoodie.payload.delete.marker";
Review Comment:
I think we should have a validation that if DELETE_KEY is set then
DELETE_MARKER should also be set? Is there any point of one being present
without the other?
##########
hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java:
##########
@@ -71,18 +73,38 @@ public Option<IndexedRecord>
combineAndGetUpdateValue(IndexedRecord currentValue
/*
* Now check if the incoming record is a delete record.
*/
- return Option.of(incomingRecord);
+ return isDeleteRecord(incomingRecord, properties) ? Option.empty() :
Option.of(incomingRecord);
}
@Override
public Option<IndexedRecord> getInsertValue(Schema schema, Properties
properties) throws IOException {
- if (recordBytes.length == 0 || isDeletedRecord) {
+ if (recordBytes.length == 0) {
return Option.empty();
}
GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes,
schema);
eventTime = updateEventTime(incomingRecord, properties);
- return Option.of(incomingRecord);
+ return isDeleteRecord(incomingRecord, properties) ? Option.empty() :
Option.of(incomingRecord);
+ }
+
+ /**
+ * @param genericRecord instance of {@link GenericRecord} of interest.
+ * @param properties payload related properties
+ * @returns {@code true} if record represents a delete record. {@code false}
otherwise.
+ */
+ protected boolean isDeleteRecord(GenericRecord genericRecord, Properties
properties) {
+ final String deleteKey = properties.getProperty(DELETE_KEY);
+ if (deleteKey == null) {
+ return super.isDeleteRecord(genericRecord);
Review Comment:
Agree, DELETE_MARKER should be set if DELETE_KEY is not null. However, if it
is null then calling the parent implementation is fine.
##########
hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java:
##########
@@ -71,18 +73,38 @@ public Option<IndexedRecord>
combineAndGetUpdateValue(IndexedRecord currentValue
/*
* Now check if the incoming record is a delete record.
*/
- return Option.of(incomingRecord);
+ return isDeleteRecord(incomingRecord, properties) ? Option.empty() :
Option.of(incomingRecord);
}
@Override
public Option<IndexedRecord> getInsertValue(Schema schema, Properties
properties) throws IOException {
- if (recordBytes.length == 0 || isDeletedRecord) {
+ if (recordBytes.length == 0) {
return Option.empty();
}
GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes,
schema);
eventTime = updateEventTime(incomingRecord, properties);
- return Option.of(incomingRecord);
+ return isDeleteRecord(incomingRecord, properties) ? Option.empty() :
Option.of(incomingRecord);
+ }
+
+ /**
+ * @param genericRecord instance of {@link GenericRecord} of interest.
+ * @param properties payload related properties
+ * @returns {@code true} if record represents a delete record. {@code false}
otherwise.
+ */
+ protected boolean isDeleteRecord(GenericRecord genericRecord, Properties
properties) {
Review Comment:
Can we add a test to cover this scenario?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]