This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.2.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit dd2773a1121205276efa09d6275f133085be0218 Author: Shuo Cheng <[email protected]> AuthorDate: Thu May 7 08:49:13 2026 +0800 fix(flink): Avoid emitting deletes for Flink source v2 batch reads (#18694) --- .../org/apache/hudi/table/HoodieTableSource.java | 2 +- .../apache/hudi/table/ITTestHoodieDataSource.java | 22 ++++++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index 69fc951a6401..d1b13ff7e375 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -310,7 +310,7 @@ public class HoodieTableSource extends FileIndexReader implements tableSchema.toString(), HoodieSchemaConverter.convertToSchema(requiredRowType).toString(), new ArrayList<>()); - boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ; + boolean emitDelete = tableType == HoodieTableType.MERGE_ON_READ && context.isStreaming(); if (conf.get(FlinkOptions.CDC_ENABLED)) { List<DataType> fieldTypes = rowDataType.getChildren(); splitReaderFunction = new HoodieCdcSplitReaderFunction( diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index 60aa113eea50..37d5e9169ff3 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -3283,6 +3283,28 @@ public class ITTestHoodieDataSource { + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]"); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testIgnoreEmitDeleteForBatchReading(boolean useSourceV2) { + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .options(getDefaultKeys()) + .option(FlinkOptions.READ_AS_STREAMING, false) + .option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ) + .option(FlinkOptions.READ_SOURCE_V2_ENABLED, useSourceV2) + .end(); + + batchTableEnv.executeSql(hoodieTableDDL); + execInsertSql(batchTableEnv, TestSQL.INSERT_T1); + // delete EQ(=) + final String deleteSql = "delete from t1 where uuid = 'id1'"; + execInsertSql(batchTableEnv, deleteSql); + List<Row> rows1 = CollectionUtil.iterableToList( + () -> batchTableEnv.sqlQuery("select * from t1").execute().collect()); + List<RowData> expected = TestData.delete(TestData.DATA_SET_SOURCE_INSERT, 0); + assertRowsEquals(rows1, expected); + } + // ------------------------------------------------------------------------- // Utilities // -------------------------------------------------------------------------
