This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new da9f25f91 [ISSUES-3627] Fix Adding flink1.19 environment configuration
exception (#3629)
da9f25f91 is described below
commit da9f25f914e732b72b89ab7b2659daf11bf3cbb4
Author: hackallan <[email protected]>
AuthorDate: Tue Mar 26 22:14:22 2024 +0800
[ISSUES-3627] Fix Adding flink1.19 environment configuration exception
(#3629)
* Added flink conf compatibility, flink-conf.yaml changed to conf.yaml
after flink 1.19
---------
Co-authored-by: qinjiyong <[email protected]>
---
.../streampark/console/core/entity/FlinkEnv.java | 24 +++++++++++++++++++++-
1 file changed, 23 insertions(+), 1 deletion(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
index b56edf631..d8a2134d7 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
@@ -20,6 +20,7 @@ package org.apache.streampark.console.core.entity;
import org.apache.streampark.common.conf.FlinkVersion;
import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.util.PropertiesUtils;
+import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.exception.ApiDetailException;
import org.apache.commons.io.FileUtils;
@@ -72,8 +73,29 @@ public class FlinkEnv implements Serializable {
private transient String streamParkScalaVersion =
scala.util.Properties.versionNumberString();
public void doSetFlinkConf() throws ApiDetailException {
+
+ File yaml;
+ float ver =
Float.parseFloat(getVersionOfFirst().concat(".").concat(getVersionOfMiddle()));
+ if (ver < 1.19f) {
+ yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml"));
+ if (!yaml.exists()) {
+ throw new ApiAlertException("cannot find flink-conf.yaml in flink/conf
");
+ }
+ } else if (ver == 1.19f) {
+ yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml"));
+ if (!yaml.exists()) {
+ yaml = new File(this.flinkHome.concat("/conf/config.yaml"));
+ }
+ if (!yaml.exists()) {
+ throw new ApiAlertException("cannot find config.yaml|flink-conf.yaml
in flink/conf ");
+ }
+ } else {
+ yaml = new File(this.flinkHome.concat("/conf/config.yaml"));
+ if (!yaml.exists()) {
+ throw new ApiAlertException("cannot find config.yaml in flink/conf ");
+ }
+ }
try {
- File yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml"));
String flinkConf = FileUtils.readFileToString(yaml,
StandardCharsets.UTF_8);
this.flinkConf = DeflaterUtils.zipString(flinkConf);
} catch (Exception e) {