This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new 775654ce05de fix: Fix the merging behavior of delete records in
payload. (#17713)
775654ce05de is described below
commit 775654ce05de87d595a1fa6d44261d6f0a6c84d5
Author: cbg-wx <[email protected]>
AuthorDate: Tue Jan 27 12:29:42 2026 +0800
fix: Fix the merging behavior of delete records in payload. (#17713)
* resolve When the ordering field of String type or Decimal type,and this
is a record marked as delete record by _hoodie_is_deleted=true,the
delete record will always be chosen during merging,regardless of the
ordering value.
* Standardized code
* fix: Fix the merging behavior of delete records in payload, reduce
unnecessary branches
---
.../common/model/DefaultHoodieRecordPayload.java | 31 +++--
.../hudi/common/model/EventTimeAvroPayload.java | 30 +++--
.../hudi/common/model/FirstValueAvroPayload.java | 16 ++-
.../table/log/HoodieMergedLogRecordScanner.java | 5 +
.../hudi/common/util/OrderingValueUtils.java | 68 +++++++++++
.../apache/hudi/table/ITTestHoodieDataSource.java | 132 +++++++++++++++++++++
.../hudi/command/payload/ExpressionPayload.scala | 2 +-
7 files changed, 259 insertions(+), 25 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
index daa1dcb0207f..dd3ac663d048 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java
@@ -20,6 +20,8 @@ package org.apache.hudi.common.model;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.OrderingValueUtils;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
@@ -49,6 +51,8 @@ public class DefaultHoodieRecordPayload extends
OverwriteWithLatestAvroPayload {
private AtomicBoolean isDeleteComputed = new AtomicBoolean(false);
private boolean isDefaultRecordPayloadDeleted = false;
+ protected static final Comparable<?> DEFAULT_VALUE = 0;
+
public DefaultHoodieRecordPayload(GenericRecord record, Comparable
orderingVal) {
super(record, orderingVal);
}
@@ -59,11 +63,7 @@ public class DefaultHoodieRecordPayload extends
OverwriteWithLatestAvroPayload {
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
currentValue, Schema schema, Properties properties) throws IOException {
- if (recordBytes.length == 0) {
- return Option.empty();
- }
-
- GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes,
schema);
+ Option<IndexedRecord> incomingRecord = recordBytes.length == 0 ?
Option.empty() : Option.of(HoodieAvroUtils.bytesToAvro(recordBytes,schema));
// Null check is needed here to support schema evolution. The record in
storage may be from old schema where
// the new ordering column might not be present and hence returns null.
@@ -74,15 +74,17 @@ public class DefaultHoodieRecordPayload extends
OverwriteWithLatestAvroPayload {
/*
* We reached a point where the value is disk is older than the incoming
record.
*/
- eventTime = updateEventTime(incomingRecord, properties);
+ if (incomingRecord.isPresent()) {
+ eventTime = updateEventTime((GenericRecord) incomingRecord.get(),
properties);
+ }
if (!isDeleteComputed.getAndSet(true)) {
- isDefaultRecordPayloadDeleted = isDeleteRecord(incomingRecord,
properties);
+ isDefaultRecordPayloadDeleted = incomingRecord.map(record ->
isDeleteRecord((GenericRecord) record, properties)).orElse(true);
}
/*
* Now check if the incoming record is a delete record.
*/
- return isDefaultRecordPayloadDeleted ? Option.empty() :
Option.of(incomingRecord);
+ return isDefaultRecordPayloadDeleted ? Option.empty() : incomingRecord;
}
@Override
@@ -165,7 +167,7 @@ public class DefaultHoodieRecordPayload extends
OverwriteWithLatestAvroPayload {
}
protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue,
- IndexedRecord incomingRecord,
Properties properties) {
+ Option<IndexedRecord>
incomingRecord, Properties properties) {
/*
* Combining strategy here returns currentValue on disk if incoming record
is older.
* The incoming record can be either a delete (sent as an upsert with
_hoodie_is_deleted set to true)
@@ -185,10 +187,15 @@ public class DefaultHoodieRecordPayload extends
OverwriteWithLatestAvroPayload {
Object persistedOrderingVal =
HoodieAvroUtils.getNestedFieldVal((GenericRecord) currentValue,
orderField,
true, consistentLogicalTimestampEnabled);
- Comparable incomingOrderingVal = (Comparable)
HoodieAvroUtils.getNestedFieldVal((GenericRecord) incomingRecord,
+ Comparable incomingOrderingVal = incomingRecord.map(record -> (Comparable)
HoodieAvroUtils.getNestedFieldVal((GenericRecord) record,
orderField,
- true, consistentLogicalTimestampEnabled);
+ true, consistentLogicalTimestampEnabled)).orElse(orderingVal);
+ if (incomingRecord.isEmpty() && DEFAULT_VALUE.equals(incomingOrderingVal))
{
+ return true;
+ }
+ Pair<Comparable, Comparable> comparablePair =
OrderingValueUtils.canonicalizeOrderingValue((Comparable) persistedOrderingVal,
incomingOrderingVal);
+ persistedOrderingVal = comparablePair.getLeft();
+ incomingOrderingVal = comparablePair.getRight();
return persistedOrderingVal == null || ((Comparable)
persistedOrderingVal).compareTo(incomingOrderingVal) <= 0;
}
-
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java
index a5de6034632c..e8a37caf40e6 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java
@@ -18,11 +18,14 @@
package org.apache.hudi.common.model;
+import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.util.Option;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
+import org.apache.hudi.common.util.OrderingValueUtils;
+import org.apache.hudi.common.util.collection.Pair;
import java.io.IOException;
import java.util.Map;
@@ -44,24 +47,34 @@ public class EventTimeAvroPayload extends
DefaultHoodieRecordPayload {
this(record.isPresent() ? record.get() : null, 0); // natural order
}
+ @Override
+ public OverwriteWithLatestAvroPayload
preCombine(OverwriteWithLatestAvroPayload oldValue) {
+ if ((recordBytes.length == 0 || isDeletedRecord) &&
DEFAULT_VALUE.equals(orderingVal)) {
+ //use natural for delete record
+ return this;
+ }
+ Pair<Comparable, Comparable> comparablePair =
OrderingValueUtils.canonicalizeOrderingValue(oldValue.orderingVal,
this.orderingVal);
+ Comparable oldValueOrderingVal = comparablePair.getLeft();
+ Comparable thisOrderingVal = comparablePair.getRight();
+ if (oldValueOrderingVal.compareTo(thisOrderingVal) > 0) {
+ return oldValue;
+ } else {
+ return this;
+ }
+ }
+
@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord
currentValue, Schema schema, Properties properties) throws IOException {
/*
* Check if the incoming record is a delete record.
*/
- if (recordBytes.length == 0 || isDeletedRecord) {
- return Option.empty();
- }
-
- GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);
-
+ Option<IndexedRecord> incomingRecord = recordBytes.length == 0 ||
isDeletedRecord ? Option.empty() :
Option.of(HoodieAvroUtils.bytesToAvro(recordBytes,schema));
// Null check is needed here to support schema evolution. The record in
storage may be from old schema where
// the new ordering column might not be present and hence returns null.
if (!needUpdatingPersistedRecord(currentValue, incomingRecord,
properties)) {
return Option.of(currentValue);
}
-
- return Option.of(incomingRecord);
+ return incomingRecord;
}
@Override
@@ -77,4 +90,5 @@ public class EventTimeAvroPayload extends
DefaultHoodieRecordPayload {
public Option<Map<String, String>> getMetadata() {
return Option.empty();
}
+
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/FirstValueAvroPayload.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/FirstValueAvroPayload.java
index 33da44e3bccd..83d98317bc19 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/FirstValueAvroPayload.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/FirstValueAvroPayload.java
@@ -23,6 +23,8 @@ import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.OrderingValueUtils;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import java.util.Properties;
@@ -99,7 +101,7 @@ public class FirstValueAvroPayload extends
DefaultHoodieRecordPayload {
@Override
protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue,
- IndexedRecord incomingRecord,
Properties properties) {
+ Option<IndexedRecord>
incomingRecord, Properties properties) {
/*
* Combining strategy here returns currentValue on disk if incoming record
is older absolutely.
* The incoming record can be either a delete (sent as an upsert with
_hoodie_is_deleted set to true)
@@ -116,9 +118,15 @@ public class FirstValueAvroPayload extends
DefaultHoodieRecordPayload {
Object persistedOrderingVal =
HoodieAvroUtils.getNestedFieldVal((GenericRecord) currentValue,
orderField,
true, consistentLogicalTimestampEnabled);
- Comparable incomingOrderingVal = (Comparable)
HoodieAvroUtils.getNestedFieldVal((GenericRecord) incomingRecord,
- orderField,
- true, consistentLogicalTimestampEnabled);
+ Comparable incomingOrderingVal = incomingRecord.map(record -> (Comparable)
HoodieAvroUtils.getNestedFieldVal((GenericRecord) record,
+ orderField,
+ true, consistentLogicalTimestampEnabled)).orElse(orderingVal);
+ if (incomingRecord.isEmpty() && DEFAULT_VALUE.equals(incomingOrderingVal))
{
+ return true;
+ }
+ Pair<Comparable, Comparable> comparablePair =
OrderingValueUtils.canonicalizeOrderingValue((Comparable) persistedOrderingVal,
incomingOrderingVal);
+ persistedOrderingVal = comparablePair.getLeft();
+ incomingOrderingVal = comparablePair.getRight();
return persistedOrderingVal == null || ((Comparable)
persistedOrderingVal).compareTo(incomingOrderingVal) < 0;
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index 9106f6b0de5f..a6523dcab69a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -29,6 +29,8 @@ import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.OrderingValueUtils;
+import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordUtils;
@@ -282,6 +284,9 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordReader
Comparable curOrderingVal =
oldRecord.getOrderingValue(this.readerSchema,
this.hoodieTableMetaClient.getTableConfig().getProps());
Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
+ Pair<Comparable, Comparable> comparablePair =
OrderingValueUtils.canonicalizeOrderingValue(curOrderingVal, deleteOrderingVal);
+ curOrderingVal = comparablePair.getLeft();
+ deleteOrderingVal = comparablePair.getRight();
// Checks the ordering value does not equal to 0
// because we use 0 as the default value which means natural order
boolean choosePrev = !deleteOrderingVal.equals(0)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/OrderingValueUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/OrderingValueUtils.java
new file mode 100644
index 000000000000..7b8e2e9fd9e2
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/util/OrderingValueUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * “License”); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an “AS IS” BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.util.Utf8;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.math.BigDecimal;
+
+/**
+ * Utility class for ordering value canonicalization. Handle compatibility
issues between different data types in precombine values.
+ */
+public class OrderingValueUtils {
+ /**
+ * Currently, there are some discrepancies between the types of the ordering
value for delete record and normal record during merging.
+ * E.g., if the precombine field is of STRING type, the ordering value of
delete record is a String, while that of normal record is an
+ * Avro UTF8. We should canonicalize the ordering values before merging.
Specifically, STRING and DECIMAL type should be handled here.
+ * <p>
+ * Processing logic:
+ * 1. Unifies string types: Converts between Utf8 (Avro) ↔ String (Java)
+ * 2. Unifies numeric types: Converts between GenericData.Fixed (Avro) ↔
BigDecimal (Java)
+ *
+ * @param oldOrder Existing ordering value in the table (typically Avro
types)
+ * @param incomingOrder New incoming ordering value (could be Java types or
Avro types)
+ * @return Pair<Comparable, Comparable> Canonicalized ordering value pair
with consistent types for comparison
+ */
+ public static Pair<Comparable, Comparable>
canonicalizeOrderingValue(Comparable oldOrder, Comparable incomingOrder) {
+ if (oldOrder instanceof Utf8 && incomingOrder instanceof String) {
+ // Case 1: Old value is Avro Utf8 type, new value is Java String type
+ // Convert Utf8 to String to unify as Java String type for comparison
+ oldOrder = oldOrder.toString();
+ } else if (incomingOrder instanceof Utf8 && oldOrder instanceof String) {
+ // Case 2: New value is Avro Utf8 type, old value is Java String type
+ // Convert Utf8 to String to unify as Java String type for comparison
+ incomingOrder = incomingOrder.toString();
+ } else if (oldOrder instanceof GenericData.Fixed && incomingOrder
instanceof BigDecimal) {
+ // Case 3: Old value is Avro Fixed type, new value is Java BigDecimal
type
+ // Convert Fixed type to BigDecimal to unify as Java BigDecimal type for
comparison
+ // Fixed type is typically used for storing decimal values (e.g.,
DECIMAL)
+ oldOrder = (BigDecimal)
HoodieAvroUtils.convertValueForSpecificDataTypes(((GenericData.Fixed)
oldOrder).getSchema(), oldOrder, false);
+ } else if (incomingOrder instanceof GenericData.Fixed && oldOrder
instanceof BigDecimal) {
+ // Case 4: New value is Avro Fixed type, old value is Java BigDecimal
type
+ // Convert Fixed type to BigDecimal to unify as Java BigDecimal type for
comparison
+ incomingOrder = (BigDecimal)
HoodieAvroUtils.convertValueForSpecificDataTypes(((GenericData.Fixed)
incomingOrder).getSchema(), incomingOrder, false);
+ }
+
+ // Return canonicalized ordering value pair ensuring both values have
consistent types
+ return Pair.of(oldOrder, incomingOrder);
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index ebd1222cc1f2..05b97b5aec15 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -63,6 +63,7 @@ import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
+import java.nio.file.Paths;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
@@ -483,6 +484,124 @@ public class ITTestHoodieDataSource {
assertRowsEquals(result, expected, true);
}
+ @ParameterizedTest
+ @MethodSource("sceneTypeAndCompactionEnabled")
+ void testHardDeleteWithStringOrderingField(boolean isDeletedFirst, boolean
doCompaction) throws Exception {
+ StoragePath storagePath = new
StoragePath(Paths.get(tempFile.getAbsolutePath().replace("%5C","\\")).toUri());
+ ExecMode execMode = ExecMode.BATCH;
+ String hoodieTableDDL = "create table t1(\n"
+ + " uuid varchar(20),\n"
+ + " name varchar(10),\n"
+ + " age int,\n"
+ + " _hoodie_is_deleted boolean,\n"
+ + " `partition` varchar(20),\n"
+ + " ts STRING,\n"
+ + " PRIMARY KEY(uuid) NOT ENFORCED\n"
+ + ")\n"
+ + "PARTITIONED BY (`partition`)\n"
+ + "with (\n"
+ + " 'connector' = 'hudi',\n"
+ + " 'table.type' = 'MERGE_ON_READ',\n"
+ + " 'index.type' = 'BUCKET',\n"
+ + " 'path' = '" + storagePath + "',\n"
+ + (doCompaction ? " 'compaction.delta_commits' = '1',\n" : "")
+ + " 'read.streaming.skip_compaction' = 'false'\n"
+ + ")";
+ batchTableEnv.executeSql(hoodieTableDDL);
+
+ String expected;
+ String insertInto;
+ if (isDeletedFirst) {
+ expected = "["
+ + "+I[id1, Danny, 23, false, par1, 101], "
+ + "+I[id2, Stephen, 33, false, par1, 103]]";
+ // first commit
+ insertInto = "insert into t1 values\n"
+ + "('id1','Danny',23,false,'par1', '101'),\n"
+ + "('id2','Stephen',33,false,'par1', '103')";
+ execInsertSql(batchTableEnv, insertInto);
+ // second commit, hard delete record with smaller order value
+ insertInto = "insert into t1 values\n"
+ + "('id2','Stephen',33, true,'par1', '102')";
+ } else {
+ // first delete record will be ignored during compaction
+ expected = doCompaction
+ ? "[+I[id1, Danny, 23, false, par1, 101], +I[id2, Stephen, 33,
false, par1, 102]]"
+ : "[+I[id1, Danny, 23, false, par1, 101]]";
+ // first commit
+ insertInto = "insert into t1 values\n"
+ + "('id1','Danny',23,false,'par1', '101'),\n"
+ + "('id2','Stephen',33,true,'par1', '103')";
+ execInsertSql(batchTableEnv, insertInto);
+ // second commit, hard delete record with smaller order value
+ insertInto = "insert into t1 values\n"
+ + "('id2','Stephen',33, false,'par1', '102')";
+ }
+ execInsertSql(batchTableEnv, insertInto);
+ List<Row> result = execSelectSql(batchTableEnv, "select * from t1",
execMode);
+ // no record is deleted.
+ assertRowsEquals(result, expected);
+ }
+
+ @ParameterizedTest
+ @MethodSource("sceneTypeAndCompactionEnabled")
+ void testHardDeleteWithDecimalOrderingField(boolean isDeletedFirst, boolean
doCompaction) throws Exception {
+ StoragePath storagePath = new
StoragePath(Paths.get(tempFile.getAbsolutePath().replace("%5C","\\")).toUri());
+ ExecMode execMode = ExecMode.BATCH;
+ String hoodieTableDDL = "create table t1(\n"
+ + " uuid varchar(20),\n"
+ + " name varchar(10),\n"
+ + " age int,\n"
+ + " _hoodie_is_deleted boolean,\n"
+ + " `partition` varchar(20),\n"
+ + " ts DECIMAL(7,2),\n"
+ + " PRIMARY KEY(uuid) NOT ENFORCED\n"
+ + ")\n"
+ + "PARTITIONED BY (`partition`)\n"
+ + "with (\n"
+ + " 'connector' = 'hudi',\n"
+ + " 'table.type' = 'MERGE_ON_READ',\n"
+ + " 'index.type' = 'BUCKET',\n"
+ + " 'path' = '" + storagePath + "',\n"
+ + (doCompaction ? " 'compaction.delta_commits' = '1',\n" : "")
+ + " 'read.streaming.skip_compaction' = 'false'\n"
+ + ")";
+ batchTableEnv.executeSql(hoodieTableDDL);
+
+ String expected;
+ String insertInto;
+ if (isDeletedFirst) {
+ expected = "["
+ + "+I[id1, Danny, 23, false, par1, 1.10], "
+ + "+I[id2, Stephen, 33, false, par1, 1.30]]";
+ // first commit
+ insertInto = "insert into t1 values\n"
+ + "('id1','Danny',23,false,'par1', 1.10),\n"
+ + "('id2','Stephen',33,false,'par1', 1.30)";
+ execInsertSql(batchTableEnv, insertInto);
+ // second commit, hard delete record with smaller order value
+ insertInto = "insert into t1 values\n"
+ + "('id2','Stephen',33, true,'par1', 1.20)";
+ } else {
+ expected = doCompaction
+ // first delete record will be ignored during compaction
+ ? "[+I[id1, Danny, 23, false, par1, 1.10], +I[id2, Stephen, 33,
false, par1, 1.20]]"
+ : "[+I[id1, Danny, 23, false, par1, 1.10]]";
+ // first commit
+ insertInto = "insert into t1 values\n"
+ + "('id1','Danny',23,false,'par1', 1.10),\n"
+ + "('id2','Stephen',33,true,'par1', 1.30)";
+ execInsertSql(batchTableEnv, insertInto);
+ // second commit, hard delete record with smaller order value
+ insertInto = "insert into t1 values\n"
+ + "('id2','Stephen',33, false,'par1', 1.20)";
+ }
+ execInsertSql(batchTableEnv, insertInto);
+ List<Row> result2 = execSelectSql(batchTableEnv, "select * from t1",
execMode);
+ // no record is deleted.
+ assertRowsEquals(result2, expected);
+ }
+
@ParameterizedTest
@MethodSource("tableTypeAndBooleanTrueFalseParams")
void testStreamReadFilterByPartition(HoodieTableType tableType, boolean
hiveStylePartitioning) throws Exception {
@@ -2244,6 +2363,19 @@ public class ITTestHoodieDataSource {
return Stream.of(data).map(Arguments::of);
}
+ /**
+ * Return test params => (SceneType, true/false).
+ */
+ private static Stream<Arguments> sceneTypeAndCompactionEnabled() {
+ Object[][] data =
+ new Object[][] {
+ {true, false},
+ {false, false},
+ {true, true},
+ {false, true}};
+ return Stream.of(data).map(Arguments::of);
+ }
+
private void execInsertSql(TableEnvironment tEnv, String insert) {
TableResult tableResult = tEnv.executeSql(insert);
// wait to finish
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
index 0989b8b09aee..cdd5669baf65 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala
@@ -130,7 +130,7 @@ class ExpressionPayload(@transient record: GenericRecord,
.serialize(resultingRow)
.asInstanceOf[GenericRecord]
- if (targetRecord.isEmpty ||
needUpdatingPersistedRecord(targetRecord.get, resultingAvroRecord, properties))
{
+ if (targetRecord.isEmpty ||
needUpdatingPersistedRecord(targetRecord.get, HOption.of(resultingAvroRecord),
properties)) {
resultRecordOpt = HOption.of(resultingAvroRecord)
} else {
// if the PreCombine field value of targetRecord is greater