This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch release-2.1.6
in repository https://gitbox.apache.org/repos/asf/streampark.git
The following commit(s) were added to refs/heads/release-2.1.6 by this push:
new 4ff8df061 [Improve] get flink-config-file improvement
4ff8df061 is described below
commit 4ff8df061bac0d604e8a8862bc5ceb05b8a2a701
Author: benjobs <[email protected]>
AuthorDate: Sun Sep 14 22:02:51 2025 +0800
[Improve] get flink-config-file improvement
---
.../streampark/console/core/entity/FlinkEnv.java | 84 ++++++++++++++++------
1 file changed, 63 insertions(+), 21 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
index 9ccf90773..2cb6cd858 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
@@ -24,6 +24,7 @@ import
org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.exception.ApiDetailException;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
@@ -67,33 +68,61 @@ public class FlinkEnv implements Serializable {
private transient String streamParkScalaVersion =
scala.util.Properties.versionNumberString();
+ private static final Float FLINK_CONFIG_CHANGE_VERSION = 1.19f;
+ private static final String LEGACY_CONFIG_FILE = "flink-conf.yaml";
+ private static final String NEW_CONFIG_FILE = "config.yaml";
+ private static final String CONF_DIR = "/conf/";
+
public void doSetFlinkConf() throws ApiDetailException {
- File yaml;
- float ver =
Float.parseFloat(getVersionOfFirst().concat(".").concat(getVersionOfMiddle()));
- if (ver < 1.19f) {
- yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml"));
- if (!yaml.exists()) {
- throw new ApiAlertException("cannot find flink-conf.yaml in flink/conf
");
+ Float version = getVersionNumber();
+ File configFile = resolveConfigFile(version);
+
+ try {
+ String flinkConf = FileUtils.readFileToString(configFile,
StandardCharsets.UTF_8);
+ this.flinkConf = DeflaterUtils.zipString(flinkConf);
+ } catch (Exception e) {
+ throw new ApiDetailException(
+ "Failed to read Flink configuration file: " +
configFile.getAbsolutePath(), e);
+ }
+ }
+
+ private File resolveConfigFile(Float version) throws ApiAlertException {
+ String confDir = this.flinkHome + CONF_DIR;
+
+ if (version < FLINK_CONFIG_CHANGE_VERSION) {
+ // For Flink < 1.19, use flink-conf.yaml
+ File configFile = new File(confDir + LEGACY_CONFIG_FILE);
+ if (!configFile.exists()) {
+ throw new ApiAlertException(
+ String.format(
+ "Cannot find %s in %s for Flink version %s",
+ LEGACY_CONFIG_FILE, confDir, this.version));
}
- } else if (ver == 1.19f) {
- yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml"));
- if (!yaml.exists()) {
- yaml = new File(this.flinkHome.concat("/conf/config.yaml"));
+ return configFile;
+ } else if (version.equals(FLINK_CONFIG_CHANGE_VERSION)) {
+ // For Flink 1.19, try both config files (backward compatibility)
+ File legacyConfigFile = new File(confDir + LEGACY_CONFIG_FILE);
+ if (legacyConfigFile.exists()) {
+ return legacyConfigFile;
}
- if (!yaml.exists()) {
- throw new ApiAlertException("cannot find config.yaml|flink-conf.yaml
in flink/conf ");
+ File newConfigFile = new File(confDir + NEW_CONFIG_FILE);
+ if (newConfigFile.exists()) {
+ return newConfigFile;
}
+ throw new ApiAlertException(
+ String.format(
+ "Cannot find either %s or %s in %s for Flink version %s",
+ LEGACY_CONFIG_FILE, NEW_CONFIG_FILE, confDir, this.version));
} else {
- yaml = new File(this.flinkHome.concat("/conf/config.yaml"));
- if (!yaml.exists()) {
- throw new ApiAlertException("cannot find config.yaml in flink/conf ");
+ // For Flink > 1.19, use config.yaml
+ File configFile = new File(confDir + NEW_CONFIG_FILE);
+ if (!configFile.exists()) {
+ throw new ApiAlertException(
+ String.format(
+ "Cannot find %s in %s for Flink version %s",
+ NEW_CONFIG_FILE, confDir, this.version));
}
- }
- try {
- String flinkConf = FileUtils.readFileToString(yaml,
StandardCharsets.UTF_8);
- this.flinkConf = DeflaterUtils.zipString(flinkConf);
- } catch (Exception e) {
- throw new ApiDetailException(e);
+ return configFile;
}
}
@@ -144,4 +173,17 @@ public class FlinkEnv implements Serializable {
public String getVersionOfLast() {
return this.version.split("\\.")[2];
}
+
+ @JsonIgnore
+ private Float getVersionNumber() {
+ if (StringUtils.isNotBlank(this.version)) {
+ return Float.parseFloat(getVersionOfFirst() + "." +
getVersionOfMiddle());
+ }
+ throw new RuntimeException("Flink version is null");
+ }
+
+ @JsonIgnore
+ public boolean isLegacyFlinkConf() {
+ return getVersionNumber() < FLINK_CONFIG_CHANGE_VERSION;
+ }
}