This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new da9f25f91 [ISSUES-3627] Fix Adding flink1.19 environment configuration 
exception (#3629)
da9f25f91 is described below

commit da9f25f914e732b72b89ab7b2659daf11bf3cbb4
Author: hackallan <[email protected]>
AuthorDate: Tue Mar 26 22:14:22 2024 +0800

    [ISSUES-3627] Fix Adding flink1.19 environment configuration exception 
(#3629)
    
    * Added flink conf compatibility, flink-conf.yaml changed to conf.yaml 
after flink 1.19
    ---------
    
    Co-authored-by: qinjiyong <[email protected]>
---
 .../streampark/console/core/entity/FlinkEnv.java   | 24 +++++++++++++++++++++-
 1 file changed, 23 insertions(+), 1 deletion(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
index b56edf631..d8a2134d7 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java
@@ -20,6 +20,7 @@ package org.apache.streampark.console.core.entity;
 import org.apache.streampark.common.conf.FlinkVersion;
 import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.common.util.PropertiesUtils;
+import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.exception.ApiDetailException;
 
 import org.apache.commons.io.FileUtils;
@@ -72,8 +73,29 @@ public class FlinkEnv implements Serializable {
   private transient String streamParkScalaVersion = 
scala.util.Properties.versionNumberString();
 
   public void doSetFlinkConf() throws ApiDetailException {
+
+    File yaml;
+    float ver = 
Float.parseFloat(getVersionOfFirst().concat(".").concat(getVersionOfMiddle()));
+    if (ver < 1.19f) {
+      yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml"));
+      if (!yaml.exists()) {
+        throw new ApiAlertException("cannot find flink-conf.yaml in flink/conf 
");
+      }
+    } else if (ver == 1.19f) {
+      yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml"));
+      if (!yaml.exists()) {
+        yaml = new File(this.flinkHome.concat("/conf/config.yaml"));
+      }
+      if (!yaml.exists()) {
+        throw new ApiAlertException("cannot find config.yaml|flink-conf.yaml 
in flink/conf ");
+      }
+    } else {
+      yaml = new File(this.flinkHome.concat("/conf/config.yaml"));
+      if (!yaml.exists()) {
+        throw new ApiAlertException("cannot find config.yaml in flink/conf ");
+      }
+    }
     try {
-      File yaml = new File(this.flinkHome.concat("/conf/flink-conf.yaml"));
       String flinkConf = FileUtils.readFileToString(yaml, 
StandardCharsets.UTF_8);
       this.flinkConf = DeflaterUtils.zipString(flinkConf);
     } catch (Exception e) {

Reply via email to