This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c36a5f7f517e fix(flink): Avoid emitting deletes for Flink source v2 
batch reads (#18694)
c36a5f7f517e is described below

commit c36a5f7f517e3349385d95bf1ff1e55ba9caa275
Author: Shuo Cheng <[email protected]>
AuthorDate: Thu May 7 08:49:13 2026 +0800

    fix(flink): Avoid emitting deletes for Flink source v2 batch reads (#18694)
---
 .../org/apache/hudi/table/HoodieTableSource.java   |  2 +-
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 22 ++++++++++++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 69fc951a6401..d1b13ff7e375 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -310,7 +310,7 @@ public class HoodieTableSource extends FileIndexReader 
implements
             tableSchema.toString(),
             HoodieSchemaConverter.convertToSchema(requiredRowType).toString(),
             new ArrayList<>());
-    boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ;
+    boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ && 
context.isStreaming();
     if (conf.get(FlinkOptions.CDC_ENABLED)) {
       List<DataType> fieldTypes = rowDataType.getChildren();
       splitReaderFunction = new HoodieCdcSplitReaderFunction(
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 60aa113eea50..37d5e9169ff3 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -3283,6 +3283,28 @@ public class ITTestHoodieDataSource {
         + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testIgnoreEmitDeleteForBatchReading(boolean useSourceV2) {
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
+        .option(FlinkOptions.READ_AS_STREAMING, false)
+        .option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ)
+        .option(FlinkOptions.READ_SOURCE_V2_ENABLED, useSourceV2)
+        .end();
+
+    batchTableEnv.executeSql(hoodieTableDDL);
+    execInsertSql(batchTableEnv, TestSQL.INSERT_T1);
+    // delete EQ(=)
+    final String deleteSql = "delete from t1 where uuid = 'id1'";
+    execInsertSql(batchTableEnv, deleteSql);
+    List<Row> rows1 = CollectionUtil.iterableToList(
+        () -> batchTableEnv.sqlQuery("select * from t1").execute().collect());
+    List<RowData> expected = TestData.delete(TestData.DATA_SET_SOURCE_INSERT, 
0);
+    assertRowsEquals(rows1, expected);
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------

Reply via email to