nsivabalan commented on a change in pull request #1704:
URL: https://github.com/apache/hudi/pull/1704#discussion_r465956954
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/model/OverwriteWithLatestAvroPayload.java
##########
@@ -91,4 +94,37 @@ private boolean isDeleteRecord(GenericRecord genericRecord) {
Object deleteMarker = genericRecord.get(deleteMarkerField);
return (deleteMarker instanceof Boolean && (boolean) deleteMarker);
}
+
+ @Override
+ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
currentValue, Schema schema, Map<String, String> props) throws IOException {
+ if (recordBytes.length == 0) {
+ return Option.empty();
+ }
+ GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
+ /*
+ * Combining strategy here returns currentValue on disk if incoming record
is older.
+ * The incoming record can be either a delete (sent as an upsert with
_hoodie_is_deleted set to true)
+ * or an insert/update record. In any case, if it is older than the record
in disk, the currentValue
+ * in disk is returned (to be rewritten with new commit time).
+ *
+ * NOTE: Deletes sent via EmptyHoodieRecordPayload and/or Delete operation
type do not hit this code path
+ * and need to be dealt with separately.
+ */
+ Comparable persistedOrderingVal = (Comparable)
getNestedFieldVal((GenericRecord) currentValue,
props.get(ORDERING_FIELD_OPT_KEY), false);
+ Comparable incomingOrderingVal = (Comparable)
getNestedFieldVal(incomingRecord, props.get(ORDERING_FIELD_OPT_KEY), false);
Review comment:
if schema got evolved and the ordering field for incoming record does
not exist in existing storage, this might throw NPE.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]