This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 737bd442686b refactor: Adapt delete handling with DeleteContext for
HoodieFlinkRecord (#13971)
737bd442686b is described below
commit 737bd442686bf1e906d2c82bcef954229af7e5d9
Author: Shuo Cheng <[email protected]>
AuthorDate: Wed Sep 24 18:47:16 2025 +0800
refactor: Adapt delete handling with DeleteContext for HoodieFlinkRecord
(#13971)
---
.../apache/hudi/client/model/HoodieFlinkRecord.java | 18 +++---------------
.../apache/hudi/table/format/FlinkRecordContext.java | 10 ++++++++++
2 files changed, 13 insertions(+), 15 deletions(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
index 1bf0d0bd5549..752e88cf4379 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.util.OrderingValues;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.table.format.FlinkRecordContext;
import org.apache.hudi.util.RowDataAvroQueryContexts;
import org.apache.hudi.util.RowDataAvroQueryContexts.RowDataQueryContext;
import org.apache.hudi.util.RowProjection;
@@ -218,24 +219,11 @@ public class HoodieFlinkRecord extends
HoodieRecord<RowData> {
@Override
protected boolean checkIsDelete(DeleteContext deleteContext, Properties
props) {
- if (data == null) {
+ if (data == null || HoodieOperation.isDelete(getOperation())) {
return true;
}
- if (HoodieOperation.isDelete(getOperation())) {
- return true;
- }
-
- // Use data field to decide.
- Schema.Field deleteField =
deleteContext.getReaderSchema().getField(HOODIE_IS_DELETED_FIELD);
- if (deleteField != null && data.getBoolean(deleteField.pos())) {
- return true;
- }
- // check custom delete marker
- return deleteContext.getCustomDeleteMarkerKeyValue().map(markerKeyValue ->
{
- Object fieldValue =
getColumnValueAsJava(deleteContext.getReaderSchema(), markerKeyValue.getKey(),
props, true);
- return markerKeyValue.getValue().equals(fieldValue);
- }).orElse(false);
+ return FlinkRecordContext.getDeleteCheckingInstance().isDeleteRecord(data,
deleteContext);
}
@Override
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
index 2d6b135ed028..16db333fcfa7 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
@@ -52,6 +52,7 @@ import java.util.Map;
import java.util.function.UnaryOperator;
public class FlinkRecordContext extends RecordContext<RowData> {
+ private static final FlinkRecordContext DELETE_CHECKING_INSTANCE = new
FlinkRecordContext(true);
private final boolean utcTimezone;
// the converter is used to create a RowData contains primary key fields only
@@ -65,6 +66,15 @@ public class FlinkRecordContext extends
RecordContext<RowData> {
this.utcTimezone = storageConf.getBoolean("read.utc-timezone",true);
}
+ private FlinkRecordContext(boolean utcTimezone) {
+ super(new DefaultJavaTypeConverter());
+ this.utcTimezone = utcTimezone;
+ }
+
+ public static FlinkRecordContext getDeleteCheckingInstance() {
+ return DELETE_CHECKING_INSTANCE;
+ }
+
@Override
public Object getValue(RowData record, Schema schema, String fieldName) {
RowDataAvroQueryContexts.FieldQueryContext fieldQueryContext =