This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch release-2.1.6
in repository https://gitbox.apache.org/repos/asf/streampark.git


The following commit(s) were added to refs/heads/release-2.1.6 by this push:
     new 4ff8df061 [Improve] get flink-config-file improvement
4ff8df061 is described below

commit 4ff8df061bac0d604e8a8862bc5ceb05b8a2a701
Author: benjobs <[email protected]>
AuthorDate: Sun Sep 14 22:02:51 2025 +0800

    [Improve] get flink-config-file improvement
---
 .../streampark/console/core/entity/FlinkEnv.java   | 84 ++++++++++++++++------
 1 file changed, 63 insertions(+), 21 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
index 9ccf90773..2cb6cd858 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
@@ -24,6 +24,7 @@ import 
org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.exception.ApiDetailException;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
 
 import com.baomidou.mybatisplus.annotation.IdType;
 import com.baomidou.mybatisplus.annotation.TableId;
@@ -67,33 +68,61 @@ public class FlinkEnv implements Serializable {
 
   private transient String streamParkScalaVersion = 
scala.util.Properties.versionNumberString();
 
+  private static final Float FLINK_CONFIG_CHANGE_VERSION = 1.19f;
+  private static final String LEGACY_CONFIG_FILE = "flink-conf.yaml";
+  private static final String NEW_CONFIG_FILE = "config.yaml";
+  private static final String CONF_DIR = "/conf/";
+
   public void doSetFlinkConf() throws ApiDetailException {
-    File yaml;
-    float ver = 
Float.parseFloat(getVersionOfFirst().concat(".").concat(getVersionOfMiddle()));
-    if (ver < 1.19f) {
-      yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml"));
-      if (!yaml.exists()) {
-        throw new ApiAlertException("cannot find flink-conf.yaml in flink/conf 
");
+    Float version = getVersionNumber();
+    File configFile = resolveConfigFile(version);
+
+    try {
+      String flinkConf = FileUtils.readFileToString(configFile, 
StandardCharsets.UTF_8);
+      this.flinkConf = DeflaterUtils.zipString(flinkConf);
+    } catch (Exception e) {
+      throw new ApiDetailException(
+          "Failed to read Flink configuration file: " + 
configFile.getAbsolutePath(), e);
+    }
+  }
+
+  private File resolveConfigFile(Float version) throws ApiAlertException {
+    String confDir = this.flinkHome + CONF_DIR;
+
+    if (version < FLINK_CONFIG_CHANGE_VERSION) {
+      // For Flink < 1.19, use flink-conf.yaml
+      File configFile = new File(confDir + LEGACY_CONFIG_FILE);
+      if (!configFile.exists()) {
+        throw new ApiAlertException(
+            String.format(
+                "Cannot find %s in %s for Flink version %s",
+                LEGACY_CONFIG_FILE, confDir, this.version));
       }
-    } else if (ver == 1.19f) {
-      yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml"));
-      if (!yaml.exists()) {
-        yaml = new File(this.flinkHome.concat("/conf/config.yaml"));
+      return configFile;
+    } else if (version.equals(FLINK_CONFIG_CHANGE_VERSION)) {
+      // For Flink 1.19, try both config files (backward compatibility)
+      File legacyConfigFile = new File(confDir + LEGACY_CONFIG_FILE);
+      if (legacyConfigFile.exists()) {
+        return legacyConfigFile;
       }
-      if (!yaml.exists()) {
-        throw new ApiAlertException("cannot find config.yaml|flink-conf.yaml 
in flink/conf ");
+      File newConfigFile = new File(confDir + NEW_CONFIG_FILE);
+      if (newConfigFile.exists()) {
+        return newConfigFile;
       }
+      throw new ApiAlertException(
+          String.format(
+              "Cannot find either %s or %s in %s for Flink version %s",
+              LEGACY_CONFIG_FILE, NEW_CONFIG_FILE, confDir, this.version));
     } else {
-      yaml = new File(this.flinkHome.concat("/conf/config.yaml"));
-      if (!yaml.exists()) {
-        throw new ApiAlertException("cannot find config.yaml in flink/conf ");
+      // For Flink > 1.19, use config.yaml
+      File configFile = new File(confDir + NEW_CONFIG_FILE);
+      if (!configFile.exists()) {
+        throw new ApiAlertException(
+            String.format(
+                "Cannot find %s in %s for Flink version %s",
+                NEW_CONFIG_FILE, confDir, this.version));
       }
-    }
-    try {
-      String flinkConf = FileUtils.readFileToString(yaml, 
StandardCharsets.UTF_8);
-      this.flinkConf = DeflaterUtils.zipString(flinkConf);
-    } catch (Exception e) {
-      throw new ApiDetailException(e);
+      return configFile;
     }
   }
 
@@ -144,4 +173,17 @@ public class FlinkEnv implements Serializable {
   public String getVersionOfLast() {
     return this.version.split("\\.")[2];
   }
+
+  @JsonIgnore
+  private Float getVersionNumber() {
+    if (StringUtils.isNotBlank(this.version)) {
+      return Float.parseFloat(getVersionOfFirst() + "." + 
getVersionOfMiddle());
+    }
+    throw new RuntimeException("Flink version is null");
+  }
+
+  @JsonIgnore
+  public boolean isLegacyFlinkConf() {
+    return getVersionNumber() < FLINK_CONFIG_CHANGE_VERSION;
+  }
 }

Reply via email to