cshuo commented on code in PR #17668:
URL: https://github.com/apache/hudi/pull/17668#discussion_r2642304334


##########
hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java:
##########
@@ -44,24 +51,48 @@ public EventTimeAvroPayload(Option<GenericRecord> record) {
     this(record.isPresent() ? record.get() : null, 0); // natural order
   }
 
+  @Override
+  public OverwriteWithLatestAvroPayload 
preCombine(OverwriteWithLatestAvroPayload oldValue) {
+    if ((recordBytes.length == 0 || isDeletedRecord) && orderingVal.equals(0)){
+      //use natural for delete record
+      return this;
+    }
+    Comparable oldValueOrderingVal = oldValue.orderingVal;
+    Comparable thisOrderingVal = orderingVal;
+    if (thisOrderingVal instanceof Utf8 && oldValueOrderingVal instanceof 
String){
+      thisOrderingVal = thisOrderingVal.toString();
+    }
+    if (thisOrderingVal instanceof GenericData.Fixed && oldValueOrderingVal 
instanceof BigDecimal){
+      Conversions.DecimalConversion conversion = new 
Conversions.DecimalConversion();
+      thisOrderingVal = conversion.fromFixed((GenericData.Fixed) 
thisOrderingVal,((GenericData.Fixed) 
thisOrderingVal).getSchema(),((GenericData.Fixed) 
thisOrderingVal).getSchema().getLogicalType());
+    }
+    if (oldValueOrderingVal.compareTo(thisOrderingVal)>0){
+      return oldValue;
+    }else {
+      return this;
+    }
+  }
+
   @Override
   public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
currentValue, Schema schema, Properties properties) throws IOException {
     /*
      * Check if the incoming record is a delete record.
      */
-    if (recordBytes.length == 0 || isDeletedRecord) {
-      return Option.empty();
-    }
-
-    GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
+//    if (recordBytes.length == 0 || isDeletedRecord) {

Review Comment:
   can be removed.



##########
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java:
##########
@@ -480,7 +480,259 @@ void testStreamReadWithDeletes() throws Exception {
     final String expected = "[+I(+I[Danny, 24]), +I(+I[Stephen, 34])]";
     assertRowsEquals(result, expected, true);
   }
+  //scene01 compaction off
+  @Test

Review Comment:
   let's use `@ParameterizedTest` to merge the tests, seems we only need two 
tests, string ordering fields with compaction off/on, decimal ordering fields 
with compaction off/on.



##########
hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java:
##########
@@ -77,4 +108,39 @@ public Option<IndexedRecord> getInsertValue(Schema schema, 
Properties properties
   public Option<Map<String, String>> getMetadata() {
     return Option.empty();
   }
+
+
+  protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue,
+                                                Option<IndexedRecord> 
incomingRecord, Properties properties) {
+    /*
+     * Combining strategy here returns currentValue on disk if incoming record 
is older.
+     * The incoming record can be either a delete (sent as an upsert with 
_hoodie_is_deleted set to true)
+     * or an insert/update record. In any case, if it is older than the record 
in disk, the currentValue
+     * in disk is returned (to be rewritten with new commit time).
+     *
+     * NOTE: Deletes sent via EmptyHoodieRecordPayload and/or Delete operation 
type do not hit this code path
+     * and need to be dealt with separately.
+     */
+    String orderField = ConfigUtils.getOrderingField(properties);
+    if (orderField == null) {
+      return true;
+    }
+    boolean consistentLogicalTimestampEnabled = 
Boolean.parseBoolean(properties.getProperty(
+            
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
+            
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()));
+    Object persistedOrderingVal = 
HoodieAvroUtils.getNestedFieldVal((GenericRecord) currentValue,
+            orderField,
+            true, consistentLogicalTimestampEnabled);
+    Comparable incomingOrderingVal = incomingRecord.map(record-> (Comparable) 
HoodieAvroUtils.getNestedFieldVal((GenericRecord) record,
+            orderField,
+            true, consistentLogicalTimestampEnabled)).orElse(orderingVal);
+    if (persistedOrderingVal instanceof Utf8){
+      persistedOrderingVal=persistedOrderingVal.toString();
+    }
+    if (persistedOrderingVal instanceof GenericData.Fixed){

Review Comment:
   for decimal type, seems we can use 
`HoodieAvroUtils#convertValueForSpecificDataTypes` directly.



##########
hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java:
##########
@@ -44,24 +51,48 @@ public EventTimeAvroPayload(Option<GenericRecord> record) {
     this(record.isPresent() ? record.get() : null, 0); // natural order
   }
 
+  @Override
+  public OverwriteWithLatestAvroPayload 
preCombine(OverwriteWithLatestAvroPayload oldValue) {
+    if ((recordBytes.length == 0 || isDeletedRecord) && orderingVal.equals(0)){
+      //use natural for delete record
+      return this;
+    }
+    Comparable oldValueOrderingVal = oldValue.orderingVal;
+    Comparable thisOrderingVal = orderingVal;
+    if (thisOrderingVal instanceof Utf8 && oldValueOrderingVal instanceof 
String){
+      thisOrderingVal = thisOrderingVal.toString();
+    }
+    if (thisOrderingVal instanceof GenericData.Fixed && oldValueOrderingVal 
instanceof BigDecimal){
+      Conversions.DecimalConversion conversion = new 
Conversions.DecimalConversion();
+      thisOrderingVal = conversion.fromFixed((GenericData.Fixed) 
thisOrderingVal,((GenericData.Fixed) 
thisOrderingVal).getSchema(),((GenericData.Fixed) 
thisOrderingVal).getSchema().getLogicalType());
+    }
+    if (oldValueOrderingVal.compareTo(thisOrderingVal)>0){
+      return oldValue;
+    }else {
+      return this;
+    }
+  }
+
   @Override
   public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
currentValue, Schema schema, Properties properties) throws IOException {
     /*
      * Check if the incoming record is a delete record.
      */
-    if (recordBytes.length == 0 || isDeletedRecord) {
-      return Option.empty();
-    }
-
-    GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
+//    if (recordBytes.length == 0 || isDeletedRecord) {
+//      return Option.empty();
+//    }
+//
+//    GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
+    Option<IndexedRecord> incomingRecord = recordBytes.length==0 || 
isDeletedRecord ? Option.empty() : Option.of(bytesToAvro(recordBytes,schema));
 
     // Null check is needed here to support schema evolution. The record in 
storage may be from old schema where
     // the new ordering column might not be present and hence returns null.
     if (!needUpdatingPersistedRecord(currentValue, incomingRecord, 
properties)) {
       return Option.of(currentValue);
     }
 
-    return Option.of(incomingRecord);
+//    return Option.of(incomingRecord);

Review Comment:
   can be removed.



##########
hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java:
##########
@@ -77,4 +108,39 @@ public Option<IndexedRecord> getInsertValue(Schema schema, 
Properties properties
   public Option<Map<String, String>> getMetadata() {
     return Option.empty();
   }
+
+
+  protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue,
+                                                Option<IndexedRecord> 
incomingRecord, Properties properties) {
+    /*
+     * Combining strategy here returns currentValue on disk if incoming record 
is older.
+     * The incoming record can be either a delete (sent as an upsert with 
_hoodie_is_deleted set to true)
+     * or an insert/update record. In any case, if it is older than the record 
in disk, the currentValue
+     * in disk is returned (to be rewritten with new commit time).
+     *
+     * NOTE: Deletes sent via EmptyHoodieRecordPayload and/or Delete operation 
type do not hit this code path
+     * and need to be dealt with separately.
+     */
+    String orderField = ConfigUtils.getOrderingField(properties);
+    if (orderField == null) {
+      return true;
+    }
+    boolean consistentLogicalTimestampEnabled = 
Boolean.parseBoolean(properties.getProperty(
+            
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
+            
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()));
+    Object persistedOrderingVal = 
HoodieAvroUtils.getNestedFieldVal((GenericRecord) currentValue,
+            orderField,
+            true, consistentLogicalTimestampEnabled);
+    Comparable incomingOrderingVal = incomingRecord.map(record-> (Comparable) 
HoodieAvroUtils.getNestedFieldVal((GenericRecord) record,
+            orderField,
+            true, consistentLogicalTimestampEnabled)).orElse(orderingVal);
+    if (persistedOrderingVal instanceof Utf8){

Review Comment:
   There are 3 places in this pr having this logic, maybe we can provide an 
common util function to process this, like `Pair<Comparable, Comparable> 
canonicalizeOrderingValue(Comparable oldOrder, Comparable incomingOrder)`.



##########
hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java:
##########
@@ -77,4 +108,39 @@ public Option<IndexedRecord> getInsertValue(Schema schema, 
Properties properties
   public Option<Map<String, String>> getMetadata() {
     return Option.empty();
   }
+
+
+  protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue,

Review Comment:
   we can modify `needUpdatingPersistedRecord` in `DefaultHoodieRecordPayload` 
directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to