cbg-wx commented on code in PR #17668:
URL: https://github.com/apache/hudi/pull/17668#discussion_r2644930475


##########
hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java:
##########
@@ -44,24 +51,48 @@ public EventTimeAvroPayload(Option<GenericRecord> record) {
     this(record.isPresent() ? record.get() : null, 0); // natural order
   }
 
+  @Override
+  public OverwriteWithLatestAvroPayload 
preCombine(OverwriteWithLatestAvroPayload oldValue) {
+    if ((recordBytes.length == 0 || isDeletedRecord) && orderingVal.equals(0)){
+      //use natural for delete record
+      return this;
+    }
+    Comparable oldValueOrderingVal = oldValue.orderingVal;
+    Comparable thisOrderingVal = orderingVal;
+    if (thisOrderingVal instanceof Utf8 && oldValueOrderingVal instanceof 
String){
+      thisOrderingVal = thisOrderingVal.toString();
+    }
+    if (thisOrderingVal instanceof GenericData.Fixed && oldValueOrderingVal 
instanceof BigDecimal){
+      Conversions.DecimalConversion conversion = new 
Conversions.DecimalConversion();
+      thisOrderingVal = conversion.fromFixed((GenericData.Fixed) 
thisOrderingVal,((GenericData.Fixed) 
thisOrderingVal).getSchema(),((GenericData.Fixed) 
thisOrderingVal).getSchema().getLogicalType());
+    }
+    if (oldValueOrderingVal.compareTo(thisOrderingVal)>0){
+      return oldValue;
+    }else {
+      return this;
+    }
+  }
+
   @Override
   public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
currentValue, Schema schema, Properties properties) throws IOException {
     /*
      * Check if the incoming record is a delete record.
      */
-    if (recordBytes.length == 0 || isDeletedRecord) {
-      return Option.empty();
-    }
-
-    GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
+//    if (recordBytes.length == 0 || isDeletedRecord) {

Review Comment:
   The latest commit has been modified.



##########
hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java:
##########
@@ -77,4 +108,39 @@ public Option<IndexedRecord> getInsertValue(Schema schema, 
Properties properties
   public Option<Map<String, String>> getMetadata() {
     return Option.empty();
   }
+
+
+  protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue,
+                                                Option<IndexedRecord> 
incomingRecord, Properties properties) {
+    /*
+     * Combining strategy here returns currentValue on disk if incoming record 
is older.
+     * The incoming record can be either a delete (sent as an upsert with 
_hoodie_is_deleted set to true)
+     * or an insert/update record. In any case, if it is older than the record 
in disk, the currentValue
+     * in disk is returned (to be rewritten with new commit time).
+     *
+     * NOTE: Deletes sent via EmptyHoodieRecordPayload and/or Delete operation 
type do not hit this code path
+     * and need to be dealt with separately.
+     */
+    String orderField = ConfigUtils.getOrderingField(properties);
+    if (orderField == null) {
+      return true;
+    }
+    boolean consistentLogicalTimestampEnabled = 
Boolean.parseBoolean(properties.getProperty(
+            
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
+            
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()));
+    Object persistedOrderingVal = 
HoodieAvroUtils.getNestedFieldVal((GenericRecord) currentValue,
+            orderField,
+            true, consistentLogicalTimestampEnabled);
+    Comparable incomingOrderingVal = incomingRecord.map(record-> (Comparable) 
HoodieAvroUtils.getNestedFieldVal((GenericRecord) record,
+            orderField,
+            true, consistentLogicalTimestampEnabled)).orElse(orderingVal);
+    if (persistedOrderingVal instanceof Utf8){

Review Comment:
   The latest commit has been modified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to