cbg-wx commented on code in PR #17668:
URL: https://github.com/apache/hudi/pull/17668#discussion_r2644930475
##########
hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java:
##########
@@ -44,24 +51,48 @@ public EventTimeAvroPayload(Option<GenericRecord> record) {
this(record.isPresent() ? record.get() : null, 0); // natural order
}
+ @Override
+ public OverwriteWithLatestAvroPayload
preCombine(OverwriteWithLatestAvroPayload oldValue) {
+ if ((recordBytes.length == 0 || isDeletedRecord) && orderingVal.equals(0)){
+ //use natural for delete record
+ return this;
+ }
+ Comparable oldValueOrderingVal = oldValue.orderingVal;
+ Comparable thisOrderingVal = orderingVal;
+ if (thisOrderingVal instanceof Utf8 && oldValueOrderingVal instanceof
String){
+ thisOrderingVal = thisOrderingVal.toString();
+ }
+ if (thisOrderingVal instanceof GenericData.Fixed && oldValueOrderingVal
instanceof BigDecimal){
+ Conversions.DecimalConversion conversion = new
Conversions.DecimalConversion();
+ thisOrderingVal = conversion.fromFixed((GenericData.Fixed)
thisOrderingVal,((GenericData.Fixed)
thisOrderingVal).getSchema(),((GenericData.Fixed)
thisOrderingVal).getSchema().getLogicalType());
+ }
+ if (oldValueOrderingVal.compareTo(thisOrderingVal)>0){
+ return oldValue;
+ }else {
+ return this;
+ }
+ }
+
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
currentValue, Schema schema, Properties properties) throws IOException {
/*
* Check if the incoming record is a delete record.
*/
- if (recordBytes.length == 0 || isDeletedRecord) {
- return Option.empty();
- }
-
- GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
+// if (recordBytes.length == 0 || isDeletedRecord) {
Review Comment:
The latest commit has been modified.
##########
hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java:
##########
@@ -77,4 +108,39 @@ public Option<IndexedRecord> getInsertValue(Schema schema,
Properties properties
public Option<Map<String, String>> getMetadata() {
return Option.empty();
}
+
+
+ protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue,
+ Option<IndexedRecord>
incomingRecord, Properties properties) {
+ /*
+ * Combining strategy here returns currentValue on disk if incoming record
is older.
+ * The incoming record can be either a delete (sent as an upsert with
_hoodie_is_deleted set to true)
+ * or an insert/update record. In any case, if it is older than the record
in disk, the currentValue
+ * in disk is returned (to be rewritten with new commit time).
+ *
+ * NOTE: Deletes sent via EmptyHoodieRecordPayload and/or Delete operation
type do not hit this code path
+ * and need to be dealt with separately.
+ */
+ String orderField = ConfigUtils.getOrderingField(properties);
+ if (orderField == null) {
+ return true;
+ }
+ boolean consistentLogicalTimestampEnabled =
Boolean.parseBoolean(properties.getProperty(
+
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
+
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()));
+ Object persistedOrderingVal =
HoodieAvroUtils.getNestedFieldVal((GenericRecord) currentValue,
+ orderField,
+ true, consistentLogicalTimestampEnabled);
+ Comparable incomingOrderingVal = incomingRecord.map(record-> (Comparable)
HoodieAvroUtils.getNestedFieldVal((GenericRecord) record,
+ orderField,
+ true, consistentLogicalTimestampEnabled)).orElse(orderingVal);
+ if (persistedOrderingVal instanceof Utf8){
Review Comment:
The latest commit has been modified.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]