This is an automated email from the ASF dual-hosted git repository.
akudinkin pushed a commit to branch release-feature-rfc46
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/release-feature-rfc46 by this
push:
new df83709f51 [Minor] fix multi deser avro payload (#7021)
df83709f51 is described below
commit df83709f51b7df32ec15e69b97fa6631f3883ce5
Author: komao <[email protected]>
AuthorDate: Tue Nov 29 07:54:51 2022 +0800
[Minor] fix multi deser avro payload (#7021)
In HoodieAvroRecord, we will call isDelete, shouldIgnore before we write it
to the file. Each method will deserialize HoodiePayload. So we add
deserialization method in HoodieRecord and call this method once before calling
isDelete or shouldIgnore.
Co-authored-by: wangzixuan.wzxuan <[email protected]>
Co-authored-by: Alexey Kudinkin <[email protected]>
Co-authored-by: Alexey Kudinkin <[email protected]>
---
.../org/apache/hudi/io/HoodieAppendHandle.java | 26 ++++++------
.../java/org/apache/hudi/io/HoodieMergeHandle.java | 1 -
.../hudi/commmon/model/HoodieSparkRecord.java | 30 +++++++++-----
.../apache/hudi/common/model/BaseAvroPayload.java | 46 +++++++++++++++++++++-
.../common/model/DefaultHoodieRecordPayload.java | 8 ++--
.../hudi/common/model/EventTimeAvroPayload.java | 9 ++---
.../apache/hudi/common/model/HoodieAvroRecord.java | 37 +++++++++++------
.../org/apache/hudi/common/model/HoodieRecord.java | 17 +++++++-
.../model/OverwriteWithLatestAvroPayload.java | 24 +----------
.../common/model/PartialUpdateAvroPayload.java | 1 -
.../apache/hudi/sink/utils/PayloadCreation.java | 2 +-
.../main/java/org/apache/hudi/QuickstartUtils.java | 12 +++---
.../hudi/command/payload/ExpressionPayload.scala | 23 ++++++++---
.../model/TestHoodieRecordSerialization.scala | 2 +-
14 files changed, 154 insertions(+), 84 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 564d63ba77..2ef02b1dae 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -35,6 +35,7 @@ import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodiePayloadProps;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.model.IOType;
@@ -215,18 +216,16 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
// If the format can not record the operation field, nullify the DELETE
payload manually.
boolean nullifyPayload =
HoodieOperation.isDelete(hoodieRecord.getOperation()) &&
!config.allowOperationMetadataField();
recordProperties.put(HoodiePayloadProps.PAYLOAD_IS_UPDATE_RECORD_FOR_MOR,
String.valueOf(isUpdateRecord));
- Option<HoodieRecord> finalRecord = Option.empty();
- if (!nullifyPayload && !hoodieRecord.isDelete(tableSchema,
recordProperties)) {
- if (hoodieRecord.shouldIgnore(tableSchema, recordProperties)) {
- return Option.of(hoodieRecord);
+ Option<HoodieRecord> finalRecord = nullifyPayload ? Option.empty() :
Option.of(hoodieRecord);
+ // Check for delete
+ if (finalRecord.isPresent() && !finalRecord.get().isDelete(tableSchema,
recordProperties)) {
+ // Check for ignore ExpressionPayload
+ if (finalRecord.get().shouldIgnore(tableSchema, recordProperties)) {
+ return finalRecord;
}
// Convert GenericRecord to GenericRecord with hoodie commit metadata
in schema
- HoodieRecord rewrittenRecord;
- if (schemaOnReadEnabled) {
- rewrittenRecord =
hoodieRecord.rewriteRecordWithNewSchema(tableSchema, recordProperties,
writeSchemaWithMetaFields);
- } else {
- rewrittenRecord = hoodieRecord.rewriteRecord(tableSchema,
recordProperties, writeSchemaWithMetaFields);
- }
+ HoodieRecord rewrittenRecord = schemaOnReadEnabled ?
finalRecord.get().rewriteRecordWithNewSchema(tableSchema, recordProperties,
writeSchemaWithMetaFields)
+ : finalRecord.get().rewriteRecord(tableSchema, recordProperties,
writeSchemaWithMetaFields);
HoodieRecord populatedRecord = populateMetadataFields(rewrittenRecord,
writeSchemaWithMetaFields, recordProperties);
finalRecord = Option.of(populatedRecord);
if (isUpdateRecord) {
@@ -236,6 +235,7 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
}
recordsWritten++;
} else {
+ finalRecord = Option.empty();
recordsDeleted++;
}
@@ -364,7 +364,9 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
updateWriteStatus(stat, result);
}
- if (config.isMetadataColumnStatsIndexEnabled()) {
+ // TODO MetadataColumnStatsIndex for spark record
+ // https://issues.apache.org/jira/browse/HUDI-5249
+ if (config.isMetadataColumnStatsIndexEnabled() &&
recordMerger.getRecordType() == HoodieRecordType.AVRO) {
final List<Schema.Field> fieldsToIndex;
// If column stats index is enabled but columns not configured then we
assume that
// all columns should be indexed
@@ -511,7 +513,7 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
record.seal();
}
// fetch the ordering val first in case the record was deflated.
- final Comparable<?> orderingVal = record.getOrderingValue(tableSchema,
config.getProps());
+ final Comparable<?> orderingVal = record.getOrderingValue(tableSchema,
recordProperties);
Option<HoodieRecord> indexedRecord = prepareRecord(record);
if (indexedRecord.isPresent()) {
// Skip the ignored record.
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 276b318890..4e5370f108 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -347,7 +347,6 @@ public class HoodieMergeHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O>
Option<Pair<HoodieRecord, Schema>> mergeResult =
recordMerger.merge(oldRecord, oldSchema, newRecord, newSchema, props);
Schema combineRecordSchema =
mergeResult.map(Pair::getRight).orElse(null);
Option<HoodieRecord> combinedRecord = mergeResult.map(Pair::getLeft);
-
if (combinedRecord.isPresent() &&
combinedRecord.get().shouldIgnore(combineRecordSchema, props)) {
// If it is an IGNORE_RECORD, just copy the old record, and do not
update the new record.
copyOldRecord = true;
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java
index 43000d1964..19b8cb5c65 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/commmon/model/HoodieSparkRecord.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
@@ -122,6 +123,19 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> implements Kryo
this.schema = schema;
}
+ public HoodieSparkRecord(
+ HoodieKey key,
+ InternalRow data,
+ StructType schema,
+ HoodieOperation operation,
+ HoodieRecordLocation currentLocation,
+ HoodieRecordLocation newLocation,
+ boolean copy) {
+ super(key, data, operation, currentLocation, newLocation);
+ this.copy = copy;
+ this.schema = schema;
+ }
+
@Override
public HoodieSparkRecord newInstance() {
return new HoodieSparkRecord(this.key, this.data, this.schema,
this.operation, this.copy);
@@ -175,7 +189,7 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> implements Kryo
InternalRow mergeRow = new JoinedRow(data, (InternalRow) other.getData());
UnsafeProjection projection =
HoodieInternalRowUtils.getCachedUnsafeProjection(targetStructType,
targetStructType);
- return new HoodieSparkRecord(getKey(), projection.apply(mergeRow),
targetStructType, getOperation(), copy);
+ return new HoodieSparkRecord(getKey(), projection.apply(mergeRow),
targetStructType, getOperation(), this.currentLocation, this.newLocation, copy);
}
@Override
@@ -189,7 +203,7 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> implements Kryo
// TODO add actual rewriting
InternalRow finalRow = new HoodieInternalRow(metaFields, data,
containMetaFields);
- return new HoodieSparkRecord(getKey(), finalRow, targetStructType,
getOperation(), copy);
+ return new HoodieSparkRecord(getKey(), finalRow, targetStructType,
getOperation(), this.currentLocation, this.newLocation, copy);
}
@Override
@@ -204,7 +218,7 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> implements Kryo
HoodieInternalRowUtils.rewriteRecordWithNewSchema(data, structType,
newStructType, renameCols);
HoodieInternalRow finalRow = new HoodieInternalRow(metaFields,
rewrittenRow, containMetaFields);
- return new HoodieSparkRecord(getKey(), finalRow, newStructType,
getOperation(), copy);
+ return new HoodieSparkRecord(getKey(), finalRow, newStructType,
getOperation(), this.currentLocation, this.newLocation, copy);
}
@Override
@@ -219,7 +233,7 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> implements Kryo
}
});
- return new HoodieSparkRecord(getKey(), updatableRow, structType,
getOperation(), copy);
+ return new HoodieSparkRecord(getKey(), updatableRow, structType,
getOperation(), this.currentLocation, this.newLocation, copy);
}
@Override
@@ -244,11 +258,7 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> implements Kryo
@Override
public boolean shouldIgnore(Schema recordSchema, Properties props) throws
IOException {
- if (data != null && data.equals(SENTINEL)) {
- return true;
- } else {
- return false;
- }
+ return false;
}
@Override
@@ -284,7 +294,7 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> implements Kryo
partition =
data.get(HoodieMetadataField.PARTITION_PATH_METADATA_FIELD.ordinal(),
StringType).toString();
}
HoodieKey hoodieKey = new HoodieKey(key, partition);
- return new HoodieSparkRecord(hoodieKey, data, structType, getOperation(),
copy);
+ return new HoodieSparkRecord(hoodieKey, data, structType, getOperation(),
this.currentLocation, this.newLocation, copy);
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java
index cd3a95e6bf..aaafe61abf 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseAvroPayload.java
@@ -21,9 +21,11 @@ package org.apache.hudi.common.model;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.exception.HoodieException;
+import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import java.io.Serializable;
+import java.util.Properties;
/**
* Base class for all AVRO record based payloads, that can be ordered based on
a field.
@@ -32,12 +34,14 @@ public abstract class BaseAvroPayload implements
Serializable {
/**
* Avro data extracted from the source converted to bytes.
*/
- public final byte[] recordBytes;
+ protected final byte[] recordBytes;
/**
* For purposes of preCombining.
*/
- public final Comparable orderingVal;
+ protected final Comparable orderingVal;
+
+ protected final boolean isDeletedRecord;
/**
* Instantiate {@link BaseAvroPayload}.
@@ -48,8 +52,46 @@ public abstract class BaseAvroPayload implements
Serializable {
public BaseAvroPayload(GenericRecord record, Comparable orderingVal) {
this.recordBytes = record != null ? HoodieAvroUtils.avroToBytes(record) :
new byte[0];
this.orderingVal = orderingVal;
+ this.isDeletedRecord = record == null || isDeleteRecord(record);
+
if (orderingVal == null) {
throw new HoodieException("Ordering value is null for record: " +
record);
}
}
+
+ public Comparable getOrderingVal() {
+ return orderingVal;
+ }
+
+ /**
+ * Defines whether this implementation of {@link HoodieRecordPayload} is
deleted.
+ * We will not do deserialization in this method.
+ */
+ public boolean isDeleted(Schema schema, Properties props) {
+ return isDeletedRecord;
+ }
+
+ /**
+ * Defines whether this implementation of {@link HoodieRecordPayload} could
produce
+ * {@link HoodieRecord#SENTINEL}
+ */
+ public boolean canProduceSentinel() {
+ return false;
+ }
+
+ /**
+ * @param genericRecord instance of {@link GenericRecord} of interest.
+ * @returns {@code true} if record represents a delete record. {@code false}
otherwise.
+ */
+ protected static boolean isDeleteRecord(GenericRecord genericRecord) {
+ final String isDeleteKey = HoodieRecord.HOODIE_IS_DELETED_FIELD;
+ // Modify to be compatible with new version Avro.
+ // The new version Avro throws for GenericRecord.get if the field name
+ // does not exist in the schema.
+ if (genericRecord.getSchema().getField(isDeleteKey) == null) {
+ return false;
+ }
+ Object deleteMarker = genericRecord.get(isDeleteKey);
+ return (deleteMarker instanceof Boolean && (boolean) deleteMarker);
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
index 5a588eafa5..a218e9dc33 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
@@ -51,7 +51,7 @@ public class DefaultHoodieRecordPayload extends
OverwriteWithLatestAvroPayload {
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
currentValue, Schema schema, Properties properties) throws IOException {
- if (recordBytes.length == 0) {
+ if (recordBytes.length == 0 || isDeletedRecord) {
return Option.empty();
}
@@ -71,18 +71,18 @@ public class DefaultHoodieRecordPayload extends
OverwriteWithLatestAvroPayload {
/*
* Now check if the incoming record is a delete record.
*/
- return isDeleteRecord(incomingRecord) ? Option.empty() :
Option.of(incomingRecord);
+ return Option.of(incomingRecord);
}
@Override
public Option<IndexedRecord> getInsertValue(Schema schema, Properties
properties) throws IOException {
- if (recordBytes.length == 0) {
+ if (recordBytes.length == 0 || isDeletedRecord) {
return Option.empty();
}
GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes,
schema);
eventTime = updateEventTime(incomingRecord, properties);
- return isDeleteRecord(incomingRecord) ? Option.empty() :
Option.of(incomingRecord);
+ return Option.of(incomingRecord);
}
private static Option<Object> updateEventTime(GenericRecord record,
Properties properties) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java
index 7c8efb66e5..b750cffb6a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java
@@ -46,7 +46,7 @@ public class EventTimeAvroPayload extends
DefaultHoodieRecordPayload {
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
currentValue, Schema schema, Properties properties) throws IOException {
- if (recordBytes.length == 0) {
+ if (recordBytes.length == 0 || isDeletedRecord) {
return Option.empty();
}
@@ -61,17 +61,16 @@ public class EventTimeAvroPayload extends
DefaultHoodieRecordPayload {
/*
* Now check if the incoming record is a delete record.
*/
- return isDeleteRecord(incomingRecord) ? Option.empty() :
Option.of(incomingRecord);
+ return Option.of(incomingRecord);
}
@Override
public Option<IndexedRecord> getInsertValue(Schema schema, Properties
properties) throws IOException {
- if (recordBytes.length == 0) {
+ if (recordBytes.length == 0 || isDeletedRecord) {
return Option.empty();
}
- GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
- return isDeleteRecord(incomingRecord) ? Option.empty() :
Option.of(incomingRecord);
+ return Option.of(bytesToAvro(recordBytes, schema));
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
index de653054cd..a1318c462c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroRecord.java
@@ -51,6 +51,15 @@ public class HoodieAvroRecord<T extends HoodieRecordPayload>
extends HoodieRecor
super(record);
}
+ public HoodieAvroRecord(
+ HoodieKey key,
+ T data,
+ HoodieOperation operation,
+ HoodieRecordLocation currentLocation,
+ HoodieRecordLocation newLocation) {
+ super(key, data, operation, currentLocation, newLocation);
+ }
+
public HoodieAvroRecord() {
}
@@ -113,14 +122,14 @@ public class HoodieAvroRecord<T extends
HoodieRecordPayload> extends HoodieRecor
Option<IndexedRecord> avroRecordPayloadOpt =
getData().getInsertValue(recordSchema, props);
GenericRecord avroPayloadInNewSchema =
HoodieAvroUtils.rewriteRecord((GenericRecord)
avroRecordPayloadOpt.get(), targetSchema);
- return new HoodieAvroRecord<>(getKey(), new
RewriteAvroPayload(avroPayloadInNewSchema), getOperation());
+ return new HoodieAvroRecord<>(getKey(), new
RewriteAvroPayload(avroPayloadInNewSchema), getOperation(),
this.currentLocation, this.newLocation);
}
@Override
public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema,
Properties props, Schema newSchema, Map<String, String> renameCols) throws
IOException {
GenericRecord oldRecord = (GenericRecord)
getData().getInsertValue(recordSchema, props).get();
GenericRecord rewriteRecord =
HoodieAvroUtils.rewriteRecordWithNewSchema(oldRecord, newSchema, renameCols);
- return new HoodieAvroRecord<>(getKey(), new
RewriteAvroPayload(rewriteRecord), getOperation());
+ return new HoodieAvroRecord<>(getKey(), new
RewriteAvroPayload(rewriteRecord), getOperation(), this.currentLocation,
this.newLocation);
}
@Override
@@ -133,30 +142,36 @@ public class HoodieAvroRecord<T extends
HoodieRecordPayload> extends HoodieRecor
}
});
- return new HoodieAvroRecord<>(getKey(), new
RewriteAvroPayload(avroRecordPayload), getOperation());
+ return new HoodieAvroRecord<>(getKey(), new
RewriteAvroPayload(avroRecordPayload), getOperation(), this.currentLocation,
this.newLocation);
}
@Override
public HoodieRecord truncateRecordKey(Schema recordSchema, Properties props,
String keyFieldName) throws IOException {
GenericRecord avroRecordPayload = (GenericRecord)
getData().getInsertValue(recordSchema, props).get();
avroRecordPayload.put(keyFieldName, StringUtils.EMPTY_STRING);
- return new HoodieAvroRecord<>(getKey(), new
RewriteAvroPayload(avroRecordPayload), getOperation());
+ return new HoodieAvroRecord<>(getKey(), new
RewriteAvroPayload(avroRecordPayload), getOperation(), this.currentLocation,
this.newLocation);
}
@Override
public boolean isDelete(Schema recordSchema, Properties props) throws
IOException {
- return !getData().getInsertValue(recordSchema, props).isPresent();
+ if (this.data instanceof BaseAvroPayload) {
+ return ((BaseAvroPayload) this.data).isDeleted(recordSchema, props);
+ } else {
+ return !this.data.getInsertValue(recordSchema, props).isPresent();
+ }
}
@Override
public boolean shouldIgnore(Schema recordSchema, Properties props) throws
IOException {
- Option<IndexedRecord> insertRecord =
getData().getInsertValue(recordSchema, props);
- // just skip the ignored record
- if (insertRecord.isPresent() && insertRecord.get().equals(SENTINEL)) {
- return true;
- } else {
- return false;
+ HoodieRecordPayload<?> recordPayload = getData();
+ // NOTE: Currently only records borne by [[ExpressionPayload]] can
currently be ignored,
+ // as such, we limit exposure of this method only to such payloads
+ if (recordPayload instanceof BaseAvroPayload && ((BaseAvroPayload)
recordPayload).canProduceSentinel()) {
+ Option<IndexedRecord> insertRecord =
recordPayload.getInsertValue(recordSchema, props);
+ return insertRecord.isPresent() && insertRecord.get().equals(SENTINEL);
}
+
+ return false;
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index 778186d4bc..255a2b2a10 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -124,12 +124,12 @@ public abstract class HoodieRecord<T> implements
HoodieRecordCompatibilityInterf
/**
* Current location of record on storage. Filled in by looking up index
*/
- private HoodieRecordLocation currentLocation;
+ protected HoodieRecordLocation currentLocation;
/**
* New location of record on storage, after written.
*/
- private HoodieRecordLocation newLocation;
+ protected HoodieRecordLocation newLocation;
/**
* Indicates whether the object is sealed.
@@ -154,6 +154,19 @@ public abstract class HoodieRecord<T> implements
HoodieRecordCompatibilityInterf
this.operation = operation;
}
+ public HoodieRecord(
+ HoodieKey key,
+ T data,
+ HoodieOperation operation,
+ HoodieRecordLocation currentLocation,
+ HoodieRecordLocation newLocation) {
+ this.key = key;
+ this.data = data;
+ this.currentLocation = currentLocation;
+ this.newLocation = newLocation;
+ this.operation = operation;
+ }
+
public HoodieRecord(HoodieRecord<T> record) {
this(record.key, record.data, record.operation);
this.currentLocation = record.currentLocation;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
index 5268d76281..a99e3005f1 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
@@ -69,31 +69,11 @@ public class OverwriteWithLatestAvroPayload extends
BaseAvroPayload
@Override
public Option<IndexedRecord> getInsertValue(Schema schema) throws
IOException {
- if (recordBytes.length == 0) {
+ if (recordBytes.length == 0 || isDeletedRecord) {
return Option.empty();
}
- IndexedRecord indexedRecord = HoodieAvroUtils.bytesToAvro(recordBytes,
schema);
- if (isDeleteRecord((GenericRecord) indexedRecord)) {
- return Option.empty();
- } else {
- return Option.of(indexedRecord);
- }
- }
- /**
- * @param genericRecord instance of {@link GenericRecord} of interest.
- * @returns {@code true} if record represents a delete record. {@code false}
otherwise.
- */
- protected boolean isDeleteRecord(GenericRecord genericRecord) {
- final String isDeleteKey = HoodieRecord.HOODIE_IS_DELETED_FIELD;
- // Modify to be compatible with new version Avro.
- // The new version Avro throws for GenericRecord.get if the field name
- // does not exist in the schema.
- if (genericRecord.getSchema().getField(isDeleteKey) == null) {
- return false;
- }
- Object deleteMarker = genericRecord.get(isDeleteKey);
- return (deleteMarker instanceof Boolean && (boolean) deleteMarker);
+ return Option.of((IndexedRecord) HoodieAvroUtils.bytesToAvro(recordBytes,
schema));
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java
index daa40acc76..7871e45156 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/PartialUpdateAvroPayload.java
@@ -143,7 +143,6 @@ public class PartialUpdateAvroPayload extends
OverwriteNonDefaultsWithLatestAvro
Schema schema,
boolean isOldRecordNewer) throws IOException {
Option<IndexedRecord> recordOption = getInsertValue(schema);
-
if (!recordOption.isPresent()) {
// use natural order for delete record
return Option.empty();
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java
index fb850bace7..b7756a490b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/PayloadCreation.java
@@ -86,7 +86,7 @@ public class PayloadCreation implements Serializable {
public HoodieRecordPayload<?> createDeletePayload(BaseAvroPayload payload)
throws Exception {
if (shouldCombine) {
- return (HoodieRecordPayload<?>) constructor.newInstance(null,
payload.orderingVal);
+ return (HoodieRecordPayload<?>) constructor.newInstance(null,
payload.getOrderingVal());
} else {
return (HoodieRecordPayload<?>)
this.constructor.newInstance(Option.empty());
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java
index 453cbb4e74..59674b928f 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java
+++
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/QuickstartUtils.java
@@ -18,7 +18,9 @@
package org.apache.hudi;
-import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -26,10 +28,6 @@ import
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericRecord;
import org.apache.spark.sql.Row;
import java.io.IOException;
@@ -239,8 +237,8 @@ public class QuickstartUtils {
private static Option<String> convertToString(HoodieRecord record) {
try {
- String str = HoodieAvroUtils
- .bytesToAvro(((OverwriteWithLatestAvroPayload)
record.getData()).recordBytes, DataGenerator.avroSchema)
+ String str = ((OverwriteWithLatestAvroPayload) record.getData())
+ .getInsertValue(DataGenerator.avroSchema)
.toString();
str = "{" + str.substring(str.indexOf("\"ts\":"));
return Option.of(str.replaceAll("}", ", \"partitionpath\": \"" +
record.getPartitionPath() + "\"}"));
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
index 0f11cbf954..5d8e224477 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
@@ -24,10 +24,10 @@ import org.apache.hudi.AvroConversionUtils
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro
+import org.apache.hudi.common.model.BaseAvroPayload.isDeleteRecord
import org.apache.hudi.common.model.{DefaultHoodieRecordPayload,
HoodiePayloadProps, HoodieRecord}
import org.apache.hudi.common.util.{ValidationUtils, Option => HOption}
import org.apache.hudi.config.HoodieWriteConfig
-import org.apache.hudi.io.HoodieWriteHandle
import org.apache.hudi.sql.IExpressionEvaluator
import org.apache.spark.sql.avro.{AvroSerializer, SchemaConverters}
import org.apache.spark.sql.catalyst.expressions.Expression
@@ -35,8 +35,8 @@ import org.apache.spark.sql.hudi.SerDeUtils
import
org.apache.spark.sql.hudi.command.payload.ExpressionPayload.{getEvaluator,
getMergedSchema, setWriteSchema}
import org.apache.spark.sql.types.{StructField, StructType}
-import java.util.{Base64, Properties}
import java.util.function.Function
+import java.util.{Base64, Properties}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
@@ -46,7 +46,7 @@ import scala.collection.mutable.ArrayBuffer
* match and not-match actions and compute the final record to write.
*
* If there is no condition match the record, ExpressionPayload will return
- * a HoodieWriteHandle.IGNORE_RECORD, and the write handles will ignore this
record.
+ * a [[HoodieRecord.SENTINEL]], and the write handles will ignore this record.
*/
class ExpressionPayload(record: GenericRecord,
orderingVal: Comparable[_])
@@ -77,11 +77,14 @@ class ExpressionPayload(record: GenericRecord,
processMatchedRecord(joinSqlRecord, Some(targetRecord), properties)
}
+ override def canProduceSentinel: Boolean = true
+
/**
* Process the matched record. Firstly test if the record matched any of the
update-conditions,
* if matched, return the update assignments result. Secondly, test if the
record matched
* delete-condition, if matched then return a delete record. Finally if no
condition matched,
- * return a {@link HoodieWriteHandle.IGNORE_RECORD} which will be ignored by
HoodieWriteHandle.
+ * return a [[HoodieRecord.SENTINEL]] which will be ignored by
HoodieWriteHandle.
+ *
* @param inputRecord The input record to process.
* @param targetRecord The origin exist record.
* @param properties The properties.
@@ -140,7 +143,7 @@ class ExpressionPayload(record: GenericRecord,
/**
* Process the not-matched record. Test if the record matched any of
insert-conditions,
* if matched then return the result of insert-assignment. Or else return a
- * {@link HoodieWriteHandle.IGNORE_RECORD} which will be ignored by
HoodieWriteHandle.
+ * [[HoodieRecord.SENTINEL]] which will be ignored by HoodieWriteHandle.
*
* @param inputRecord The input record to process.
* @param properties The properties.
@@ -173,6 +176,16 @@ class ExpressionPayload(record: GenericRecord,
}
}
+ override def isDeleted(schema: Schema, props: Properties): Boolean = {
+ val deleteConditionText =
props.get(ExpressionPayload.PAYLOAD_DELETE_CONDITION)
+ val isUpdateRecord =
props.getProperty(HoodiePayloadProps.PAYLOAD_IS_UPDATE_RECORD_FOR_MOR,
"false").toBoolean
+ val isDeleteOnCondition= if (isUpdateRecord && deleteConditionText !=
null) {
+ !getInsertValue(schema, props).isPresent
+ } else false
+
+ isDeletedRecord || isDeleteOnCondition
+ }
+
override def getInsertValue(schema: Schema, properties: Properties):
HOption[IndexedRecord] = {
val incomingRecord = bytesToAvro(recordBytes, schema)
if (isDeleteRecord(incomingRecord)) {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
index eb1339ad2f..02cb46721c 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
@@ -110,7 +110,7 @@ class TestHoodieRecordSerialization extends
SparkClientFunctionalTestHarness {
val avroIndexedRecord = new HoodieAvroIndexedRecord(key, avroRecord)
Seq(
- (legacyRecord, 527),
+ (legacyRecord, 528),
(avroIndexedRecord, 389)
) foreach { case (record, expectedSize) => routine(record, expectedSize) }
}