This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6de81b8664bd fix: disable embedded timeline service for flink upgrade 
(#14096)
6de81b8664bd is described below

commit 6de81b8664bdd8d555385c424913aee415f67036
Author: Shuo Cheng <[email protected]>
AuthorDate: Thu Oct 16 15:06:10 2025 +0800

    fix: disable embedded timeline service for flink upgrade (#14096)
---
 .../table/upgrade/FlinkUpgradeDowngradeHelper.java |  4 +++
 .../apache/hudi/table/ITTestHoodieDataSource.java  | 35 ++++++++++++++++++++++
 2 files changed, 39 insertions(+)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngradeHelper.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngradeHelper.java
index dc4ab6dc4f92..c56494a7cce0 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngradeHelper.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/upgrade/FlinkUpgradeDowngradeHelper.java
@@ -54,6 +54,10 @@ public class FlinkUpgradeDowngradeHelper implements 
SupportsUpgradeDowngrade {
 
   @Override
   public BaseHoodieWriteClient getWriteClient(HoodieWriteConfig config, 
HoodieEngineContext context) {
+    // Because flink enables reusing of embedded timeline service by default, 
disable the embedded time service to avoid closing of the reused timeline 
server.
+    // The write config inherits from the info of the currently running 
timeline server started in coordinator, even though the flag is disabled, it 
still can
+    // access the remote timeline server started before.
+    config.setValue(HoodieWriteConfig.EMBEDDED_TIMELINE_SERVER_ENABLE.key(), 
"false");
     return new HoodieFlinkWriteClient(context, config);
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 0a317c5f40b6..b08e772c198a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CollectionUtils;
@@ -2592,6 +2593,40 @@ public class ITTestHoodieDataSource {
     assertRowsEquals(rows, TestData.DATA_SET_SINGLE_INSERT);
   }
 
+  @Test
+  void testStreamWriteAndReadWithUpgrade() throws Exception {
+    String createSource = TestConfigurations.getFileSourceDDL("source");
+    streamTableEnv.executeSql(createSource);
+
+    // init and write data with table version SIX
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ)
+        .option(FlinkOptions.WRITE_TABLE_VERSION, 
HoodieTableVersion.SIX.versionCode() + "")
+        .option(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), false)
+        .end();
+    streamTableEnv.executeSql(hoodieTableDDL);
+    String insertInto = "insert into t1 select * from source";
+    execInsertSql(streamTableEnv, insertInto);
+
+    streamTableEnv.executeSql("drop table t1");
+
+    hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .option(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ)
+        .option(FlinkOptions.WRITE_TABLE_VERSION, 
HoodieTableVersion.EIGHT.versionCode() + "")
+        .option(HoodieWriteConfig.ALLOW_EMPTY_COMMIT.key(), false)
+        .end();
+
+    // write another batch of data with table version EIGHT
+    streamTableEnv.executeSql(hoodieTableDDL);
+    insertInto = "insert into t1 select * from source";
+    execInsertSql(streamTableEnv, insertInto);
+
+    List<Row> rows = execSelectSql(streamTableEnv, "select * from t1", 10);
+    assertRowsEquals(rows, TestData.DATA_SET_SOURCE_INSERT);
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------

Reply via email to