This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6de81b8664bd fix: disable embedded timeline service for flink upgrade
(#14096)
6de81b8664bd is described below
commit 6de81b8664bdd8d555385c424913aee415f67036
Author: Shuo Cheng <[email protected]>
AuthorDate: Thu Oct 16 15:06:10 2025 +0800
fix: disable embedded timeline service for flink upgrade (#14096)
---
.../table/upgrade/FlinkUpgradeDowngradeHelper.java | 4 +++
.../apache/hudi/table/ITTestHoodieDataSource.java | 35 ++++++++++++++++++++++
2 files changed, 39 insertions(+)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngradeHelper.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngradeHelper.java
index dc4ab6dc4f92..c56494a7cce0 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngradeHelper.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngradeHelper.java
@@ -54,6 +54,10 @@ public class FlinkUpgradeDowngradeHelper implements
SupportsUpgradeDowngrade {
@Override
public BaseHoodieWriteClient getWriteClient(HoodieWriteConfig config,
HoodieEngineContext context) {
+ // Because flink enables reusing of embedded timeline service by default,
disable the embedded time service to avoid closing of the reused timeline
server.
+ // The write config inherits from the info of the currently running
timeline server started in coordinator, even though the flag is disabled, it
still can
+ // access the remote timeline server started before.
+ config.setValue(HoodieWriteConfig.EMBEDDED_TIMELINE_SERVER_ENABLE.key(),
"false");
return new HoodieFlinkWriteClient(context, config);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 0a317c5f40b6..b08e772c198a 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.CollectionUtils;
@@ -2592,6 +2593,40 @@ public class ITTestHoodieDataSource {
assertRowsEquals(rows, TestData.DATA_SET_SINGLE_INSERT);
}
+ @Test
+ void testStreamWriteAndReadWithUpgrade() throws Exception {
+ String createSource = TestConfigurations.getFileSourceDDL("source");
+ streamTableEnv.executeSql(createSource);
+
+ // init and write data with table version SIX
+ String hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ)
+ .option(FlinkOptions.WRITE_TABLE_VERSION,
HoodieTableVersion.SIX.versionCode() + "")
+ .option(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), false)
+ .end();
+ streamTableEnv.executeSql(hoodieTableDDL);
+ String insertInto = "insert into t1 select * from source";
+ execInsertSql(streamTableEnv, insertInto);
+
+ streamTableEnv.executeSql("drop table t1");
+
+ hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .option(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ)
+ .option(FlinkOptions.WRITE_TABLE_VERSION,
HoodieTableVersion.EIGHT.versionCode() + "")
+ .option(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), false)
+ .end();
+
+ // write another batch of data with table version EIGHT
+ streamTableEnv.executeSql(hoodieTableDDL);
+ insertInto = "insert into t1 select * from source";
+ execInsertSql(streamTableEnv, insertInto);
+
+ List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
+ assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------