This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/branch-0.x by this push:
     new 775654ce05de  fix: Fix the merging behavior of delete records in 
payload. (#17713)
775654ce05de is described below

commit 775654ce05de87d595a1fa6d44261d6f0a6c84d5
Author: cbg-wx <[email protected]>
AuthorDate: Tue Jan 27 12:29:42 2026 +0800

     fix: Fix the merging behavior of delete records in payload. (#17713)
    
    * resolve When the ordering field of String type or Decimal type,and this
    is a record marked as delete record by _hoodie_is_deleted=true,the
    delete record will always be chosen during merging,regardless of the
    ordering value.
    
    * Standardized code
    
    * fix: Fix the merging behavior of delete records in payload, reduce 
unnecessary branches
---
 .../common/model/DefaultHoodieRecordPayload.java   |  31 +++--
 .../hudi/common/model/EventTimeAvroPayload.java    |  30 +++--
 .../hudi/common/model/FirstValueAvroPayload.java   |  16 ++-
 .../table/log/HoodieMergedLogRecordScanner.java    |   5 +
 .../hudi/common/util/OrderingValueUtils.java       |  68 +++++++++++
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 132 +++++++++++++++++++++
 .../hudi/command/payload/ExpressionPayload.scala   |   2 +-
 7 files changed, 259 insertions(+), 25 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
index daa1dcb0207f..dd3ac663d048 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
@@ -20,6 +20,8 @@ package org.apache.hudi.common.model;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.OrderingValueUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
@@ -49,6 +51,8 @@ public class DefaultHoodieRecordPayload extends 
OverwriteWithLatestAvroPayload {
   private AtomicBoolean isDeleteComputed = new AtomicBoolean(false);
   private boolean isDefaultRecordPayloadDeleted = false;
 
+  protected static final Comparable<?> DEFAULT_VALUE = 0;
+
   public DefaultHoodieRecordPayload(GenericRecord record, Comparable 
orderingVal) {
     super(record, orderingVal);
   }
@@ -59,11 +63,7 @@ public class DefaultHoodieRecordPayload extends 
OverwriteWithLatestAvroPayload {
 
   @Override
   public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
currentValue, Schema schema, Properties properties) throws IOException {
-    if (recordBytes.length == 0) {
-      return Option.empty();
-    }
-
-    GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, 
schema);
+    Option<IndexedRecord> incomingRecord = recordBytes.length == 0 ? 
Option.empty() : Option.of(HoodieAvroUtils.bytesToAvro(recordBytes,schema));
 
     // Null check is needed here to support schema evolution. The record in 
storage may be from old schema where
     // the new ordering column might not be present and hence returns null.
@@ -74,15 +74,17 @@ public class DefaultHoodieRecordPayload extends 
OverwriteWithLatestAvroPayload {
     /*
      * We reached a point where the value is disk is older than the incoming 
record.
      */
-    eventTime = updateEventTime(incomingRecord, properties);
+    if (incomingRecord.isPresent()) {
+      eventTime = updateEventTime((GenericRecord) incomingRecord.get(), 
properties);
+    }
 
     if (!isDeleteComputed.getAndSet(true)) {
-      isDefaultRecordPayloadDeleted = isDeleteRecord(incomingRecord, 
properties);
+      isDefaultRecordPayloadDeleted = incomingRecord.map(record -> 
isDeleteRecord((GenericRecord) record, properties)).orElse(true);
     }
     /*
      * Now check if the incoming record is a delete record.
      */
-    return isDefaultRecordPayloadDeleted ? Option.empty() : 
Option.of(incomingRecord);
+    return isDefaultRecordPayloadDeleted ? Option.empty() : incomingRecord;
   }
 
   @Override
@@ -165,7 +167,7 @@ public class DefaultHoodieRecordPayload extends 
OverwriteWithLatestAvroPayload {
   }
 
   protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue,
-                                                IndexedRecord incomingRecord, 
Properties properties) {
+                                                Option<IndexedRecord> 
incomingRecord, Properties properties) {
     /*
      * Combining strategy here returns currentValue on disk if incoming record 
is older.
      * The incoming record can be either a delete (sent as an upsert with 
_hoodie_is_deleted set to true)
@@ -185,10 +187,15 @@ public class DefaultHoodieRecordPayload extends 
OverwriteWithLatestAvroPayload {
     Object persistedOrderingVal = 
HoodieAvroUtils.getNestedFieldVal((GenericRecord) currentValue,
         orderField,
         true, consistentLogicalTimestampEnabled);
-    Comparable incomingOrderingVal = (Comparable) 
HoodieAvroUtils.getNestedFieldVal((GenericRecord) incomingRecord,
+    Comparable incomingOrderingVal = incomingRecord.map(record -> (Comparable) 
HoodieAvroUtils.getNestedFieldVal((GenericRecord) record,
         orderField,
-        true, consistentLogicalTimestampEnabled);
+        true, consistentLogicalTimestampEnabled)).orElse(orderingVal);
+    if (incomingRecord.isEmpty() && DEFAULT_VALUE.equals(incomingOrderingVal)) 
{
+      return true;
+    }
+    Pair<Comparable, Comparable> comparablePair = 
OrderingValueUtils.canonicalizeOrderingValue((Comparable) persistedOrderingVal, 
incomingOrderingVal);
+    persistedOrderingVal = comparablePair.getLeft();
+    incomingOrderingVal = comparablePair.getRight();
     return persistedOrderingVal == null || ((Comparable) 
persistedOrderingVal).compareTo(incomingOrderingVal) <= 0;
   }
-
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java
index a5de6034632c..e8a37caf40e6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java
@@ -18,11 +18,14 @@
 
 package org.apache.hudi.common.model;
 
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.util.Option;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.util.OrderingValueUtils;
+import org.apache.hudi.common.util.collection.Pair;
 
 import java.io.IOException;
 import java.util.Map;
@@ -44,24 +47,34 @@ public class EventTimeAvroPayload extends 
DefaultHoodieRecordPayload {
     this(record.isPresent() ? record.get() : null, 0); // natural order
   }
 
+  @Override
+  public OverwriteWithLatestAvroPayload 
preCombine(OverwriteWithLatestAvroPayload oldValue) {
+    if ((recordBytes.length == 0 || isDeletedRecord) && 
DEFAULT_VALUE.equals(orderingVal)) {
+      //use natural for delete record
+      return this;
+    }
+    Pair<Comparable, Comparable> comparablePair = 
OrderingValueUtils.canonicalizeOrderingValue(oldValue.orderingVal, 
this.orderingVal);
+    Comparable oldValueOrderingVal = comparablePair.getLeft();
+    Comparable thisOrderingVal = comparablePair.getRight();
+    if (oldValueOrderingVal.compareTo(thisOrderingVal) > 0) {
+      return oldValue;
+    } else {
+      return this;
+    }
+  }
+
   @Override
   public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord 
currentValue, Schema schema, Properties properties) throws IOException {
     /*
      * Check if the incoming record is a delete record.
      */
-    if (recordBytes.length == 0 || isDeletedRecord) {
-      return Option.empty();
-    }
-
-    GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
-
+    Option<IndexedRecord> incomingRecord = recordBytes.length == 0 || 
isDeletedRecord ? Option.empty() : 
Option.of(HoodieAvroUtils.bytesToAvro(recordBytes,schema));
     // Null check is needed here to support schema evolution. The record in 
storage may be from old schema where
     // the new ordering column might not be present and hence returns null.
     if (!needUpdatingPersistedRecord(currentValue, incomingRecord, 
properties)) {
       return Option.of(currentValue);
     }
-
-    return Option.of(incomingRecord);
+    return incomingRecord;
   }
 
   @Override
@@ -77,4 +90,5 @@ public class EventTimeAvroPayload extends 
DefaultHoodieRecordPayload {
   public Option<Map<String, String>> getMetadata() {
     return Option.empty();
   }
+
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/FirstValueAvroPayload.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/FirstValueAvroPayload.java
index 33da44e3bccd..83d98317bc19 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/FirstValueAvroPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/FirstValueAvroPayload.java
@@ -23,6 +23,8 @@ import org.apache.avro.generic.IndexedRecord;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.OrderingValueUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 
 import java.util.Properties;
@@ -99,7 +101,7 @@ public class FirstValueAvroPayload extends 
DefaultHoodieRecordPayload {
 
   @Override
   protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue,
-                                                IndexedRecord incomingRecord, 
Properties properties) {
+                                                Option<IndexedRecord> 
incomingRecord, Properties properties) {
     /*
      * Combining strategy here returns currentValue on disk if incoming record 
is older absolutely.
      * The incoming record can be either a delete (sent as an upsert with 
_hoodie_is_deleted set to true)
@@ -116,9 +118,15 @@ public class FirstValueAvroPayload extends 
DefaultHoodieRecordPayload {
     Object persistedOrderingVal = 
HoodieAvroUtils.getNestedFieldVal((GenericRecord) currentValue,
             orderField,
             true, consistentLogicalTimestampEnabled);
-    Comparable incomingOrderingVal = (Comparable) 
HoodieAvroUtils.getNestedFieldVal((GenericRecord) incomingRecord,
-            orderField,
-            true, consistentLogicalTimestampEnabled);
+    Comparable incomingOrderingVal = incomingRecord.map(record -> (Comparable) 
HoodieAvroUtils.getNestedFieldVal((GenericRecord) record,
+        orderField,
+        true, consistentLogicalTimestampEnabled)).orElse(orderingVal);
+    if (incomingRecord.isEmpty() && DEFAULT_VALUE.equals(incomingOrderingVal)) 
{
+      return true;
+    }
+    Pair<Comparable, Comparable> comparablePair = 
OrderingValueUtils.canonicalizeOrderingValue((Comparable) persistedOrderingVal, 
incomingOrderingVal);
+    persistedOrderingVal = comparablePair.getLeft();
+    incomingOrderingVal = comparablePair.getRight();
     return persistedOrderingVal == null || ((Comparable) 
persistedOrderingVal).compareTo(incomingOrderingVal) < 0;
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index 9106f6b0de5f..a6523dcab69a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -29,6 +29,8 @@ import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
 import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.OrderingValueUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.common.util.DefaultSizeEstimator;
 import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
 import org.apache.hudi.common.util.HoodieRecordUtils;
@@ -282,6 +284,9 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordReader
 
       Comparable curOrderingVal = 
oldRecord.getOrderingValue(this.readerSchema, 
this.hoodieTableMetaClient.getTableConfig().getProps());
       Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
+      Pair<Comparable, Comparable> comparablePair = 
OrderingValueUtils.canonicalizeOrderingValue(curOrderingVal, deleteOrderingVal);
+      curOrderingVal = comparablePair.getLeft();
+      deleteOrderingVal = comparablePair.getRight();
       // Checks the ordering value does not equal to 0
       // because we use 0 as the default value which means natural order
       boolean choosePrev = !deleteOrderingVal.equals(0)
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/OrderingValueUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/OrderingValueUtils.java
new file mode 100644
index 000000000000..7b8e2e9fd9e2
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/OrderingValueUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * “License”); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an “AS IS” BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.util.Utf8;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.math.BigDecimal;
+
+/**
+ * Utility class for ordering value canonicalization. Handle compatibility 
issues between different data types in precombine values.
+ */
+public class OrderingValueUtils {
+  /**
+   * Currently, there are some discrepancies between the types of the ordering 
value for delete record and normal record during merging.
+   * E.g., if the precombine field is of STRING type, the ordering value of 
delete record is a String, while that of normal record is an
+   * Avro UTF8. We should canonicalize the ordering values before merging. 
Specifically, STRING and DECIMAL type should be handled here.
+   * <p>
+   * Processing logic:
+   * 1. Unifies string types: Converts between Utf8 (Avro) ↔ String (Java)
+   * 2. Unifies numeric types: Converts between GenericData.Fixed (Avro) ↔ 
BigDecimal (Java)
+   *
+   * @param oldOrder Existing ordering value in the table (typically Avro 
types)
+   * @param incomingOrder New incoming ordering value (could be Java types or 
Avro types)
+   * @return Pair<Comparable, Comparable> Canonicalized ordering value pair 
with consistent types for comparison
+   */
+  public static Pair<Comparable, Comparable> 
canonicalizeOrderingValue(Comparable oldOrder, Comparable incomingOrder) {
+    if (oldOrder instanceof Utf8 && incomingOrder instanceof String) {
+      // Case 1: Old value is Avro Utf8 type, new value is Java String type
+      // Convert Utf8 to String to unify as Java String type for comparison
+      oldOrder = oldOrder.toString();
+    } else if (incomingOrder instanceof Utf8 && oldOrder instanceof String) {
+      // Case 2: New value is Avro Utf8 type, old value is Java String type
+      // Convert Utf8 to String to unify as Java String type for comparison
+      incomingOrder = incomingOrder.toString();
+    } else if (oldOrder instanceof GenericData.Fixed && incomingOrder 
instanceof BigDecimal) {
+      // Case 3: Old value is Avro Fixed type, new value is Java BigDecimal 
type
+      // Convert Fixed type to BigDecimal to unify as Java BigDecimal type for 
comparison
+      // Fixed type is typically used for storing decimal values (e.g., 
DECIMAL)
+      oldOrder = (BigDecimal) 
HoodieAvroUtils.convertValueForSpecificDataTypes(((GenericData.Fixed) 
oldOrder).getSchema(), oldOrder, false);
+    } else if (incomingOrder instanceof GenericData.Fixed && oldOrder 
instanceof BigDecimal) {
+      // Case 4: New value is Avro Fixed type, old value is Java BigDecimal 
type
+      // Convert Fixed type to BigDecimal to unify as Java BigDecimal type for 
comparison
+      incomingOrder = (BigDecimal) 
HoodieAvroUtils.convertValueForSpecificDataTypes(((GenericData.Fixed) 
incomingOrder).getSchema(), incomingOrder, false);
+    }
+
+    // Return canonicalized ordering value pair ensuring both values have 
consistent types
+    return Pair.of(oldOrder, incomingOrder);
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index ebd1222cc1f2..05b97b5aec15 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -63,6 +63,7 @@ import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
+import java.nio.file.Paths;
 import java.time.Instant;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
@@ -483,6 +484,124 @@ public class ITTestHoodieDataSource {
     assertRowsEquals(result, expected, true);
   }
 
+  @ParameterizedTest
+  @MethodSource("sceneTypeAndCompactionEnabled")
+  void testHardDeleteWithStringOrderingField(boolean isDeletedFirst, boolean 
doCompaction) throws Exception {
+    StoragePath storagePath = new 
StoragePath(Paths.get(tempFile.getAbsolutePath().replace("%5C","\\")).toUri());
+    ExecMode execMode = ExecMode.BATCH;
+    String hoodieTableDDL = "create table t1(\n"
+            + "  uuid varchar(20),\n"
+            + "  name varchar(10),\n"
+            + "  age int,\n"
+            + "  _hoodie_is_deleted boolean,\n"
+            + "  `partition` varchar(20),\n"
+            + "  ts STRING,\n"
+            + "  PRIMARY KEY(uuid) NOT ENFORCED\n"
+            + ")\n"
+            + "PARTITIONED BY (`partition`)\n"
+            + "with (\n"
+            + "  'connector' = 'hudi',\n"
+            + "  'table.type' = 'MERGE_ON_READ',\n"
+            + "  'index.type' = 'BUCKET',\n"
+            + "  'path' = '" + storagePath + "',\n"
+            + (doCompaction ?  "  'compaction.delta_commits' = '1',\n" : "")
+            + "  'read.streaming.skip_compaction' = 'false'\n"
+            + ")";
+    batchTableEnv.executeSql(hoodieTableDDL);
+
+    String expected;
+    String insertInto;
+    if (isDeletedFirst) {
+      expected = "["
+              + "+I[id1, Danny, 23, false, par1, 101], "
+              + "+I[id2, Stephen, 33, false, par1, 103]]";
+      // first commit
+      insertInto = "insert into t1 values\n"
+              + "('id1','Danny',23,false,'par1', '101'),\n"
+              + "('id2','Stephen',33,false,'par1', '103')";
+      execInsertSql(batchTableEnv, insertInto);
+      // second commit, hard delete record with smaller order value
+      insertInto = "insert into t1 values\n"
+              + "('id2','Stephen',33, true,'par1', '102')";
+    } else {
+      // first delete record will be ignored during compaction
+      expected = doCompaction
+          ? "[+I[id1, Danny, 23, false, par1, 101], +I[id2, Stephen, 33, 
false, par1, 102]]"
+          : "[+I[id1, Danny, 23, false, par1, 101]]";
+      // first commit
+      insertInto = "insert into t1 values\n"
+              + "('id1','Danny',23,false,'par1', '101'),\n"
+              + "('id2','Stephen',33,true,'par1', '103')";
+      execInsertSql(batchTableEnv, insertInto);
+      // second commit, hard delete record with smaller order value
+      insertInto = "insert into t1 values\n"
+              + "('id2','Stephen',33, false,'par1', '102')";
+    }
+    execInsertSql(batchTableEnv, insertInto);
+    List<Row> result = execSelectSql(batchTableEnv, "select * from t1", 
execMode);
+    // no record is deleted.
+    assertRowsEquals(result, expected);
+  }
+
+  @ParameterizedTest
+  @MethodSource("sceneTypeAndCompactionEnabled")
+  void testHardDeleteWithDecimalOrderingField(boolean isDeletedFirst, boolean 
doCompaction) throws Exception {
+    StoragePath storagePath = new 
StoragePath(Paths.get(tempFile.getAbsolutePath().replace("%5C","\\")).toUri());
+    ExecMode execMode = ExecMode.BATCH;
+    String hoodieTableDDL = "create table t1(\n"
+            + "  uuid varchar(20),\n"
+            + "  name varchar(10),\n"
+            + "  age int,\n"
+            + "  _hoodie_is_deleted boolean,\n"
+            + "  `partition` varchar(20),\n"
+            + "  ts DECIMAL(7,2),\n"
+            + "  PRIMARY KEY(uuid) NOT ENFORCED\n"
+            + ")\n"
+            + "PARTITIONED BY (`partition`)\n"
+            + "with (\n"
+            + "  'connector' = 'hudi',\n"
+            + "  'table.type' = 'MERGE_ON_READ',\n"
+            + "  'index.type' = 'BUCKET',\n"
+            + "  'path' = '" + storagePath + "',\n"
+            + (doCompaction ?  "  'compaction.delta_commits' = '1',\n" : "")
+            + "  'read.streaming.skip_compaction' = 'false'\n"
+            + ")";
+    batchTableEnv.executeSql(hoodieTableDDL);
+
+    String expected;
+    String insertInto;
+    if (isDeletedFirst) {
+      expected = "["
+              + "+I[id1, Danny, 23, false, par1, 1.10], "
+              + "+I[id2, Stephen, 33, false, par1, 1.30]]";
+      // first commit
+      insertInto = "insert into t1 values\n"
+              + "('id1','Danny',23,false,'par1', 1.10),\n"
+              + "('id2','Stephen',33,false,'par1', 1.30)";
+      execInsertSql(batchTableEnv, insertInto);
+      // second commit, hard delete record with smaller order value
+      insertInto = "insert into t1 values\n"
+              + "('id2','Stephen',33, true,'par1', 1.20)";
+    } else {
+      expected = doCompaction
+          // first delete record will be ignored during compaction
+          ? "[+I[id1, Danny, 23, false, par1, 1.10], +I[id2, Stephen, 33, 
false, par1, 1.20]]"
+          : "[+I[id1, Danny, 23, false, par1, 1.10]]";
+      // first commit
+      insertInto = "insert into t1 values\n"
+              + "('id1','Danny',23,false,'par1', 1.10),\n"
+              + "('id2','Stephen',33,true,'par1', 1.30)";
+      execInsertSql(batchTableEnv, insertInto);
+      // second commit, hard delete record with smaller order value
+      insertInto = "insert into t1 values\n"
+              + "('id2','Stephen',33, false,'par1', 1.20)";
+    }
+    execInsertSql(batchTableEnv, insertInto);
+    List<Row> result2 = execSelectSql(batchTableEnv, "select * from t1", 
execMode);
+    // no record is deleted.
+    assertRowsEquals(result2, expected);
+  }
+
   @ParameterizedTest
   @MethodSource("tableTypeAndBooleanTrueFalseParams")
   void testStreamReadFilterByPartition(HoodieTableType tableType, boolean 
hiveStylePartitioning) throws Exception {
@@ -2244,6 +2363,19 @@ public class ITTestHoodieDataSource {
     return Stream.of(data).map(Arguments::of);
   }
 
+  /**
+   * Return test params => (SceneType, true/false).
+   */
+  private static Stream<Arguments> sceneTypeAndCompactionEnabled() {
+    Object[][] data =
+        new Object[][] {
+            {true, false},
+            {false, false},
+            {true, true},
+            {false, true}};
+    return Stream.of(data).map(Arguments::of);
+  }
+
   private void execInsertSql(TableEnvironment tEnv, String insert) {
     TableResult tableResult = tEnv.executeSql(insert);
     // wait to finish
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
index 0989b8b09aee..cdd5669baf65 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
@@ -130,7 +130,7 @@ class ExpressionPayload(@transient record: GenericRecord,
           .serialize(resultingRow)
           .asInstanceOf[GenericRecord]
 
-        if (targetRecord.isEmpty || 
needUpdatingPersistedRecord(targetRecord.get, resultingAvroRecord, properties)) 
{
+        if (targetRecord.isEmpty || 
needUpdatingPersistedRecord(targetRecord.get, HOption.of(resultingAvroRecord), 
properties)) {
           resultRecordOpt = HOption.of(resultingAvroRecord)
         } else {
           // if the PreCombine field value of targetRecord is greater

Reply via email to