This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.1.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit df7a366bc5e66ed5e50b84c7f089ebad78802d2c Author: Shuo Cheng <[email protected]> AuthorDate: Thu Oct 16 15:06:10 2025 +0800 fix: disable embedded timeline service for flink upgrade (#14096) --- .../table/upgrade/FlinkUpgradeDowngradeHelper.java | 4 +++ .../apache/hudi/table/ITTestHoodieDataSource.java | 35 ++++++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngradeHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngradeHelper.java index dc4ab6dc4f92..c56494a7cce0 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngradeHelper.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngradeHelper.java @@ -54,6 +54,10 @@ public class FlinkUpgradeDowngradeHelper implements SupportsUpgradeDowngrade { @Override public BaseHoodieWriteClient getWriteClient(HoodieWriteConfig config, HoodieEngineContext context) { + // Because flink enables reusing of embedded timeline service by default, disable the embedded time service to avoid closing of the reused timeline server. + // The write config inherits from the info of the currently running timeline server started in coordinator, even though the flag is disabled, it still can + // access the remote timeline server started before. + config.setValue(HoodieWriteConfig.EMBEDDED_TIMELINE_SERVER_ENABLE.key(), "false"); return new HoodieFlinkWriteClient(context, config); } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index 0a317c5f40b6..b08e772c198a 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CollectionUtils; @@ -2592,6 +2593,40 @@ public class ITTestHoodieDataSource { assertRowsEquals(rows, TestData.DATA_SET_SINGLE_INSERT); } + @Test + void testStreamWriteAndReadWithUpgrade() throws Exception { + String createSource = TestConfigurations.getFileSourceDDL("source"); + streamTableEnv.executeSql(createSource); + + // init and write data with table version SIX + String hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ) + .option(FlinkOptions.WRITE_TABLE_VERSION, HoodieTableVersion.SIX.versionCode() + "") + .option(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), false) + .end(); + streamTableEnv.executeSql(hoodieTableDDL); + String insertInto = "insert into t1 select * from source"; + execInsertSql(streamTableEnv, insertInto); + + streamTableEnv.executeSql("drop table t1"); + + hoodieTableDDL = sql("t1") + .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) + .option(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ) + .option(FlinkOptions.WRITE_TABLE_VERSION, HoodieTableVersion.EIGHT.versionCode() + "") + .option(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), false) + .end(); + + // write another batch of data with table version EIGHT + streamTableEnv.executeSql(hoodieTableDDL); + insertInto = "insert into t1 select * from source"; + execInsertSql(streamTableEnv, insertInto); + + List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10); + assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT); + } + // ------------------------------------------------------------------------- // Utilities // -------------------------------------------------------------------------
