This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.4
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.4 by this push:
     new 8bd8fc090 [Improve] config.yaml configuration file compatibility 
improvements
8bd8fc090 is described below

commit 8bd8fc0903f892159eeecc44f562cfae672d7f14
Author: benjobs <[email protected]>
AuthorDate: Sat Apr 6 15:25:53 2024 +0800

    [Improve] config.yaml configuration file compatibility improvements
---
 .../src/main/assembly/bin/jvm_opts.sh              |   4 +
 .../src/main/assembly/bin/streampark.sh            |  63 +++++-----
 .../console/base/config/MybatisConfig.java         |   8 +-
 .../console/base/config/SpringProperties.java      |  66 +++++++---
 .../console/base/util/BashJavaUtils.java           |  43 +++++++
 .../streampark/flink/core/conf/ParameterCli.scala  | 137 ++++++++++-----------
 6 files changed, 193 insertions(+), 128 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/bin/jvm_opts.sh
 
b/streampark-console/streampark-console-service/src/main/assembly/bin/jvm_opts.sh
index 93fc9083e..9a7bdff7d 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/bin/jvm_opts.sh
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/bin/jvm_opts.sh
@@ -17,6 +17,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
+-server
 -Xms1g
 -Xmx1g
 -Xmn512m
@@ -33,3 +34,6 @@
 -XX:GCLogFileSize=50M
 -XX:NumberOfGCLogFiles=10
 
+# solved jdk1.8+ dynamic loading of resources to the classpath issue, if jdk > 
1.8, you can enable this parameter
+#--add-opens java.base/jdk.internal.loader=ALL-UNNAMED --add-opens 
jdk.zipfs/jdk.nio.zipfs=ALL-UNNAMED
+
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
 
b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
index 41b9d0628..4bfbaf89b 100755
--- 
a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
@@ -137,12 +137,6 @@ APP_OUT="$APP_LOG"/streampark.out
 # shellcheck disable=SC2034
 APP_TMPDIR="$APP_BASE"/temp
 
-CONFIG="${APP_CONF}/config.yaml"
-if [[ ! -f "$CONFIG" ]] ; then
-  echo_r "ERROR: $CONFIG invalid or not found! please check.";
-  exit 1;
-fi
-
 # Ensure that any user defined CLASSPATH variables are not used on startup,
 # but allow them to be specified in setenv.sh, in rare case when it is needed.
 CLASSPATH=
@@ -247,13 +241,13 @@ if [ "$USE_NOHUP" = "true" ]; then
   NOHUP="nohup"
 fi
 
-PARAM_CLI="org.apache.streampark.flink.core.conf.ParameterCli"
+BASH_UTIL="org.apache.streampark.console.base.util.BashJavaUtils"
 
 APP_MAIN="org.apache.streampark.console.StreamParkConsoleBootstrap"
 
 JVM_OPTS_FILE=${APP_HOME}/bin/jvm_opts.sh
 
-JVM_ARGS="-server"
+JVM_ARGS=""
 if [ -f $JVM_OPTS_FILE ]; then
   while read line
   do
@@ -284,18 +278,20 @@ print_logo() {
   printf '      %s   ──────── Apache StreamPark, Make stream processing easier 
ô~ô!%s\n\n'         $PRIMARY  $RESET
 }
 
-read_config() {
-  local prop_key=$1
-  local value
-  while IFS=':' read -r k v; do
-    k="${k/[[:space:]]/}"
-    v="${v/[[:space:]]/}"
-    if [[ ! $k = \#* ]] && [[ $k = $prop_key ]]; then
-      value=$v
-      break
+init_env() {
+  # shellcheck disable=SC2006
+  CONFIG="${APP_CONF}/application.yml"
+  if [[ -f "$CONFIG" ]] ; then
+    echo_y """[WARN] in the \"conf\" directory, found the \"application.yml\" 
file. The \"application.yml\" file is deprecated.
+       For compatibility, this application.yml will be used preferentially. 
The latest configuration file is \"config.yaml\". It is recommended to use 
\"config.yaml\".
+       Note: \"application.yml\" will be completely deprecated in version 
2.2.0. """
+  else
+    CONFIG="${APP_CONF}/config.yaml"
+    if [[ ! -f "$CONFIG" ]] ; then
+      echo_r "can not found config.yaml in \"conf\" directory, please check."
+      exit 1;
     fi
-  done < "$CONFIG"
-  echo "$value"
+  fi
 }
 
 # shellcheck disable=SC2120
@@ -316,7 +312,8 @@ get_pid() {
     fi
   fi
 
-  local serverPort=$(read_config "server.port")
+  # shellcheck disable=SC2006
+  local serverPort=`$_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --yaml "server.port" 
"$CONFIG"`
   if [ x"${serverPort}" == x"" ]; then
     echo_r "server.port is required, please check $CONFIG"
     exit 1;
@@ -361,12 +358,14 @@ start() {
     echo_w "Using APP_PID:   $APP_PID"
   fi
 
-  local workspace=$(read_config "streampark.workspace.local")
-  if [[ ! -d $workspace ]]; then
-    echo_r "ERROR: streampark.workspace.local: \"$workspace\" is invalid path, 
Please check $CONFIG"
-    echo_r "NOTE: \"streampark.workspace.local\" Do not set under 
APP_HOME($APP_HOME). Set it to a secure directory outside of APP_HOME.  "
-    exit 1;
-  fi
+   # shellcheck disable=SC2006
+   local workspace=`$_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --yaml 
"streampark.workspace.local" "$CONFIG"`
+   if [[ ! -d $workspace ]]; then
+     echo_r "ERROR: streampark.workspace.local: \"$workspace\" is invalid 
path, Please reconfigure in $CONFIG"
+     echo_r "NOTE: \"streampark.workspace.local\" Do not set under 
APP_HOME($APP_HOME). Set it to a secure directory outside of APP_HOME.  "
+     exit 1;
+   fi
+
   if [[ ! -w $workspace ]] || [[ ! -r $workspace ]]; then
       echo_r "ERROR: streampark.workspace.local: \"$workspace\" Permission 
denied! "
       exit 1;
@@ -403,9 +402,7 @@ start() {
   # shellcheck disable=SC2034
   # shellcheck disable=SC2006
   # shellcheck disable=SC2155
-  local ADD_OPENS=`$_RUNJAVA -cp "$APP_CLASSPATH" $PARAM_CLI --vmopt`
-
-  local JAVA_OPTS="$ADD_OPENS $JVM_OPTS $DEBUG_OPTS"
+  local JAVA_OPTS="$JVM_OPTS $DEBUG_OPTS"
 
   echo_g "JAVA_OPTS:  ${JAVA_OPTS}"
 
@@ -471,14 +468,9 @@ start_docker() {
     APP_CLASSPATH+=":${HADOOP_HOME}/etc/hadoop"
   fi
 
-  # shellcheck disable=SC2034
-  # shellcheck disable=SC2006
-  # shellcheck disable=SC2155
-  local ADD_OPENS=`$_RUNJAVA -cp "$APP_CLASSPATH" $PARAM_CLI --vmopt`
-
   JVM_OPTS="${JVM_OPTS} -XX:-UseContainerSupport"
 
-  local JAVA_OPTS="$ADD_OPENS $JVM_OPTS $DEBUG_OPTS"
+  local JAVA_OPTS="$JVM_OPTS $DEBUG_OPTS"
 
   echo_g "JAVA_OPTS:  ${JAVA_OPTS}"
 
@@ -573,6 +565,7 @@ restart() {
 
 main() {
   print_logo
+  init_env
   case "$1" in
     "debug")
         DEBUG_PORT=$2
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/MybatisConfig.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/MybatisConfig.java
index b1798193a..27730f5c5 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/MybatisConfig.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/MybatisConfig.java
@@ -56,7 +56,9 @@ public class MybatisConfig {
    * @return {@linkplain PostgreSQLQueryInterceptor}
    */
   @Bean
-  @ConditionalOnProperty(name = "datasource.dialect", havingValue = "pgsql")
+  @ConditionalOnProperty(
+      name = {"datasource.dialect", "spring.profiles.active"},
+      havingValue = "pgsql")
   public PostgreSQLQueryInterceptor postgreSQLQueryInterceptor() {
     return new PostgreSQLQueryInterceptor();
   }
@@ -67,7 +69,9 @@ public class MybatisConfig {
    * @return {@linkplain PostgreSQLPrepareInterceptor}
    */
   @Bean
-  @ConditionalOnProperty(name = "datasource.dialect", havingValue = "pgsql")
+  @ConditionalOnProperty(
+      name = {"datasource.dialect", "spring.profiles.active"},
+      havingValue = "pgsql")
   public PostgreSQLPrepareInterceptor postgreSQLPrepareInterceptor() {
     return new PostgreSQLPrepareInterceptor();
   }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/SpringProperties.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/SpringProperties.java
index 8198c9289..48735f544 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/SpringProperties.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/SpringProperties.java
@@ -18,11 +18,14 @@
 package org.apache.streampark.console.base.config;
 
 import org.apache.streampark.common.util.PropertiesUtils;
+import org.apache.streampark.common.util.SystemPropertyUtils;
 import org.apache.streampark.console.base.util.WebUtils;
 
 import org.apache.commons.lang3.StringUtils;
 
 import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.InputStream;
@@ -31,23 +34,30 @@ import java.util.Properties;
 
 public class SpringProperties {
 
-  public static Properties get() {
-    // 1) get spring config
-    Properties springConfig = getSpringConfig();
-
-    // 2) get user config
-    Properties userConfig = getUserConfig();
-
-    // 3) merge config
-    mergeConfig(userConfig, springConfig);
-
-    // 4) datasource
-    dataSourceConfig(userConfig, springConfig);
-
-    // 5) system.setProperties
-    springConfig.forEach((k, v) -> System.setProperty(k.toString(), 
v.toString()));
+  private static final Logger log = 
LoggerFactory.getLogger(SpringProperties.class);
 
-    return springConfig;
+  public static Properties get() {
+    File oldConfig = getOldConfig();
+    if (oldConfig != null) {
+      log.warn(
+          "in the \"conf\" directory, found the \"application.yml\" file. The 
\"application.yml\" file is deprecated. "
+              + "For compatibility, this \"application.yml\" file will be used 
preferentially. The latest configuration file is \"config.yaml\". "
+              + "It is recommended to use \"config.yaml\". Note: 
\"application.yml\" will be completely deprecated in version 2.2.0. ");
+      SystemPropertyUtils.set("spring.config.location", 
oldConfig.getAbsolutePath());
+      return new Properties();
+    } else {
+      // 1) get spring config
+      Properties springConfig = getSpringConfig();
+      // 2) get user config
+      Properties userConfig = getUserConfig();
+      // 3) merge config
+      mergeConfig(userConfig, springConfig);
+      // 4) datasource
+      dataSourceConfig(userConfig, springConfig);
+      // 5) system.setProperties
+      springConfig.forEach((k, v) -> System.setProperty(k.toString(), 
v.toString()));
+      return springConfig;
+    }
   }
 
   private static void dataSourceConfig(Properties userConfig, Properties 
springConfig) {
@@ -112,12 +122,21 @@ public class SpringProperties {
         });
   }
 
+  private static boolean useOldConfig() {
+    String appHome = WebUtils.getAppHome();
+    if (appHome == null) {
+      return false;
+    }
+    File file = new File(appHome + "/conf/application.yml");
+    return file.exists();
+  }
+
   private static Properties getUserConfig() {
     String appHome = WebUtils.getAppHome();
+    Properties properties = new Properties();
     if (appHome != null) {
       File file = new File(appHome + "/conf/config.yaml");
       if (file.exists() && file.isFile()) {
-        Properties properties = new Properties();
         Map<String, String> config = 
PropertiesUtils.fromYamlFileAsJava(file.getAbsolutePath());
         properties.putAll(config);
         return properties;
@@ -126,7 +145,6 @@ public class SpringProperties {
     } else {
       InputStream inputStream =
           
SpringProperties.class.getClassLoader().getResourceAsStream("config.yaml");
-      Properties properties = new Properties();
       Map<String, String> config = 
PropertiesUtils.fromYamlFileAsJava(inputStream);
       properties.putAll(config);
       return properties;
@@ -166,4 +184,16 @@ public class SpringProperties {
 
     return config;
   }
+
+  private static File getOldConfig() {
+    String appHome = WebUtils.getAppHome();
+    if (appHome == null) {
+      return null;
+    }
+    File file = new File(appHome + "/conf/application.yml");
+    if (file.exists()) {
+      return file;
+    }
+    return null;
+  }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java
new file mode 100644
index 000000000..ec6120cae
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.base.util;
+
+import org.apache.streampark.common.util.PropertiesUtils;
+
+import java.util.Arrays;
+import java.util.Map;
+
+public class BashJavaUtils {
+
+  public static void main(String[] args) {
+    String action = args[0].toLowerCase();
+    String[] actionArgs = Arrays.copyOfRange(args, 1, args.length);
+
+    switch (action) {
+      case "--yaml":
+        String key = actionArgs[0];
+        String conf = actionArgs[1];
+        Map<String, String> confMap = PropertiesUtils.fromYamlFileAsJava(conf);
+        String value = confMap.get(key);
+        System.out.println(value);
+        break;
+      default:
+        break;
+    }
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala
index 1206c817e..400acc47b 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala
@@ -34,90 +34,81 @@ object ParameterCli {
   private[this] val optionPrefix = KEY_FLINK_OPTION_PREFIX
   private[this] val optionMain = s"$propertyPrefix$$internal.application.main"
 
-  lazy val flinkOptions: Options = FlinkRunOption.allOptions
+  private lazy val flinkOptions: Options = FlinkRunOption.allOptions
 
   lazy val parser = new DefaultParser
 
   def main(args: Array[String]): Unit = print(read(args))
 
   def read(args: Array[String]): String = {
-    args(0) match {
-      case "--vmopt" =>
-        // solved jdk1.8+ dynamic loading of resources to the classpath problem
-        ClassLoader.getSystemClassLoader match {
-          case c if c.isInstanceOf[URLClassLoader] => ""
+    val action = args(0)
+    val conf = args(1)
+    val map = Try {
+      val extension = conf.split("\\.").last.toLowerCase
+      extension match {
+        case "yml" | "yaml" => PropertiesUtils.fromYamlFile(conf)
+        case "conf" => PropertiesUtils.fromHoconFile(conf)
+        case "properties" => PropertiesUtils.fromPropertiesFile(conf)
+        case _ =>
+          throw new IllegalArgumentException(
+            "[StreamPark] Usage:flink.conf file error,must be 
(yml|conf|properties)")
+      }
+    } match {
+      case Success(value) => value
+      case _ => Map.empty[String, String]
+    }
+    val programArgs = args.drop(2)
+    action match {
+      case "--option" =>
+        val option = getOption(map, programArgs)
+        val buffer = new StringBuffer()
+        Try {
+          val line = parser.parse(flinkOptions, option, false)
+          line.getOptions.foreach(
+            x => {
+              buffer.append(s" -${x.getOpt}")
+              if (x.hasArg) {
+                buffer.append(s" ${x.getValue()}")
+              }
+            })
+        } match {
+          case Failure(exception) => exception.printStackTrace()
           case _ =>
-            "--add-opens java.base/jdk.internal.loader=ALL-UNNAMED --add-opens 
jdk.zipfs/jdk.nio.zipfs=ALL-UNNAMED"
         }
-      case action =>
-        val conf = args(1)
-        val map = Try {
-          val extension = conf.split("\\.").last.toLowerCase
-          extension match {
-            case "yml" | "yaml" => PropertiesUtils.fromYamlFile(conf)
-            case "conf" => PropertiesUtils.fromHoconFile(conf)
-            case "properties" => PropertiesUtils.fromPropertiesFile(conf)
-            case _ =>
-              throw new IllegalArgumentException(
-                "[StreamPark] Usage:flink.conf file error,must be 
(yml|conf|properties)")
-          }
-        } match {
-          case Success(value) => value
-          case _ => Map.empty[String, String]
+        map.getOrElse(optionMain, null) match {
+          case null =>
+          case mainClass => buffer.append(s" -c $mainClass")
         }
-        val programArgs = args.drop(2)
-        action match {
-          case "--option" =>
-            val option = getOption(map, programArgs)
-            val buffer = new StringBuffer()
-            Try {
-              val line = parser.parse(flinkOptions, option, false)
-              line.getOptions.foreach(
-                x => {
-                  buffer.append(s" -${x.getOpt}")
-                  if (x.hasArg) {
-                    buffer.append(s" ${x.getValue()}")
-                  }
-                })
-            } match {
-              case Failure(exception) => exception.printStackTrace()
-              case _ =>
-            }
-            map.getOrElse(optionMain, null) match {
-              case null =>
-              case mainClass => buffer.append(s" -c $mainClass")
-            }
-            buffer.toString.trim
-          case "--property" =>
-            val buffer = new StringBuffer()
-            map
-              .filter(x => x._1 != optionMain && 
x._1.startsWith(propertyPrefix) && x._2.nonEmpty)
-              .foreach {
-                x =>
-                  val key = x._1.drop(propertyPrefix.length).trim
-                  val value = x._2.trim
-                  if (key == ConfigConst.KEY_FLINK_APP_NAME) {
-                    buffer.append(s" -D$key=${value.replace(" ", "_")}")
-                  } else {
-                    buffer.append(s" -D$key=$value")
-                  }
+        buffer.toString.trim
+      case "--property" =>
+        val buffer = new StringBuffer()
+        map
+          .filter(x => x._1 != optionMain && x._1.startsWith(propertyPrefix) 
&& x._2.nonEmpty)
+          .foreach {
+            x =>
+              val key = x._1.drop(propertyPrefix.length).trim
+              val value = x._2.trim
+              if (key == ConfigConst.KEY_FLINK_APP_NAME) {
+                buffer.append(s" -D$key=${value.replace(" ", "_")}")
+              } else {
+                buffer.append(s" -D$key=$value")
               }
-            buffer.toString.trim
-          case "--name" =>
-            
map.getOrElse(propertyPrefix.concat(ConfigConst.KEY_FLINK_APP_NAME), "").trim 
match {
-              case appName if appName.nonEmpty => appName
-              case _ => ""
-            }
-          // is detached mode
-          case "--detached" =>
-            val option = getOption(map, programArgs)
-            val line = parser.parse(FlinkRunOption.allOptions, option, false)
-            val detached = 
line.hasOption(FlinkRunOption.DETACHED_OPTION.getOpt) || line.hasOption(
-              FlinkRunOption.DETACHED_OPTION.getLongOpt)
-            val mode = if (detached) "Detached" else "Attach"
-            mode
-          case _ => null
+          }
+        buffer.toString.trim
+      case "--name" =>
+        map.getOrElse(propertyPrefix.concat(ConfigConst.KEY_FLINK_APP_NAME), 
"").trim match {
+          case appName if appName.nonEmpty => appName
+          case _ => ""
         }
+      // is detached mode
+      case "--detached" =>
+        val option = getOption(map, programArgs)
+        val line = parser.parse(FlinkRunOption.allOptions, option, false)
+        val detached = line.hasOption(FlinkRunOption.DETACHED_OPTION.getOpt) 
|| line.hasOption(
+          FlinkRunOption.DETACHED_OPTION.getLongOpt)
+        val mode = if (detached) "Detached" else "Attach"
+        mode
+      case _ => null
     }
   }
 

Reply via email to