This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 737bd442686b refactor: Adapt delete handling with DeleteContext for 
HoodieFlinkRecord (#13971)
737bd442686b is described below

commit 737bd442686bf1e906d2c82bcef954229af7e5d9
Author: Shuo Cheng <[email protected]>
AuthorDate: Wed Sep 24 18:47:16 2025 +0800

    refactor: Adapt delete handling with DeleteContext for HoodieFlinkRecord 
(#13971)
---
 .../apache/hudi/client/model/HoodieFlinkRecord.java    | 18 +++---------------
 .../apache/hudi/table/format/FlinkRecordContext.java   | 10 ++++++++++
 2 files changed, 13 insertions(+), 15 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
index 1bf0d0bd5549..752e88cf4379 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.util.OrderingValues;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.table.format.FlinkRecordContext;
 import org.apache.hudi.util.RowDataAvroQueryContexts;
 import org.apache.hudi.util.RowDataAvroQueryContexts.RowDataQueryContext;
 import org.apache.hudi.util.RowProjection;
@@ -218,24 +219,11 @@ public class HoodieFlinkRecord extends 
HoodieRecord<RowData> {
 
   @Override
   protected boolean checkIsDelete(DeleteContext deleteContext, Properties 
props) {
-    if (data == null) {
+    if (data == null || HoodieOperation.isDelete(getOperation())) {
       return true;
     }
 
-    if (HoodieOperation.isDelete(getOperation())) {
-      return true;
-    }
-
-    // Use data field to decide.
-    Schema.Field deleteField = 
deleteContext.getReaderSchema().getField(HOODIE_IS_DELETED_FIELD);
-    if (deleteField != null && data.getBoolean(deleteField.pos())) {
-      return true;
-    }
-    // check custom delete marker
-    return deleteContext.getCustomDeleteMarkerKeyValue().map(markerKeyValue -> 
{
-      Object fieldValue = 
getColumnValueAsJava(deleteContext.getReaderSchema(), markerKeyValue.getKey(), 
props, true);
-      return markerKeyValue.getValue().equals(fieldValue);
-    }).orElse(false);
+    return FlinkRecordContext.getDeleteCheckingInstance().isDeleteRecord(data, 
deleteContext);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
index 2d6b135ed028..16db333fcfa7 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
@@ -52,6 +52,7 @@ import java.util.Map;
 import java.util.function.UnaryOperator;
 
 public class FlinkRecordContext extends RecordContext<RowData> {
+  private static final FlinkRecordContext DELETE_CHECKING_INSTANCE = new 
FlinkRecordContext(true);
 
   private final boolean utcTimezone;
   // the converter is used to create a RowData contains primary key fields only
@@ -65,6 +66,15 @@ public class FlinkRecordContext extends 
RecordContext<RowData> {
     this.utcTimezone = storageConf.getBoolean("read.utc-timezone",true);
   }
 
+  private FlinkRecordContext(boolean utcTimezone) {
+    super(new DefaultJavaTypeConverter());
+    this.utcTimezone = utcTimezone;
+  }
+
+  public static FlinkRecordContext getDeleteCheckingInstance() {
+    return DELETE_CHECKING_INSTANCE;
+  }
+
   @Override
   public Object getValue(RowData record, Schema schema, String fieldName) {
     RowDataAvroQueryContexts.FieldQueryContext fieldQueryContext =

Reply via email to