This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c36a5f7f517e fix(flink): Avoid emitting deletes for Flink source v2
batch reads (#18694)
c36a5f7f517e is described below
commit c36a5f7f517e3349385d95bf1ff1e55ba9caa275
Author: Shuo Cheng <[email protected]>
AuthorDate: Thu May 7 08:49:13 2026 +0800
fix(flink): Avoid emitting deletes for Flink source v2 batch reads (#18694)
---
.../org/apache/hudi/table/HoodieTableSource.java | 2 +-
.../apache/hudi/table/ITTestHoodieDataSource.java | 22 ++++++++++++++++++++++
2 files changed, 23 insertions(+), 1 deletion(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 69fc951a6401..d1b13ff7e375 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -310,7 +310,7 @@ public class HoodieTableSource extends FileIndexReader
implements
tableSchema.toString(),
HoodieSchemaConverter.convertToSchema(requiredRowType).toString(),
new ArrayList<>());
- boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ;
+ boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ &&
context.isStreaming();
if (conf.get(FlinkOptions.CDC_ENABLED)) {
List<DataType> fieldTypes = rowDataType.getChildren();
splitReaderFunction = new HoodieCdcSplitReaderFunction(
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 60aa113eea50..37d5e9169ff3 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -3283,6 +3283,28 @@ public class ITTestHoodieDataSource {
+ "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]");
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testIgnoreEmitDeleteForBatchReading(boolean useSourceV2) {
+ String hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
+ .option(FlinkOptions.READ_AS_STREAMING, false)
+ .option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ)
+ .option(FlinkOptions.READ_SOURCE_V2_ENABLED, useSourceV2)
+ .end();
+
+ batchTableEnv.executeSql(hoodieTableDDL);
+ execInsertSql(batchTableEnv, TestSQL.INSERT_T1);
+ // delete EQ(=)
+ final String deleteSql = "delete from t1 where uuid = 'id1'";
+ execInsertSql(batchTableEnv, deleteSql);
+ List<Row> rows1 = CollectionUtil.iterableToList(
+ () -> batchTableEnv.sqlQuery("select * from t1").execute().collect());
+ List<RowData> expected = TestData.delete(TestData.DATA_SET_SOURCE_INSERT,
0);
+ assertRowsEquals(rows1, expected);
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------