This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.4
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.4 by this push:
new 8bd8fc090 [Improve] config.yaml configuration file compatibility
improvements
8bd8fc090 is described below
commit 8bd8fc0903f892159eeecc44f562cfae672d7f14
Author: benjobs <[email protected]>
AuthorDate: Sat Apr 6 15:25:53 2024 +0800
[Improve] config.yaml configuration file compatibility improvements
---
.../src/main/assembly/bin/jvm_opts.sh | 4 +
.../src/main/assembly/bin/streampark.sh | 63 +++++-----
.../console/base/config/MybatisConfig.java | 8 +-
.../console/base/config/SpringProperties.java | 66 +++++++---
.../console/base/util/BashJavaUtils.java | 43 +++++++
.../streampark/flink/core/conf/ParameterCli.scala | 137 ++++++++++-----------
6 files changed, 193 insertions(+), 128 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/bin/jvm_opts.sh
b/streampark-console/streampark-console-service/src/main/assembly/bin/jvm_opts.sh
index 93fc9083e..9a7bdff7d 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/bin/jvm_opts.sh
+++
b/streampark-console/streampark-console-service/src/main/assembly/bin/jvm_opts.sh
@@ -17,6 +17,7 @@
# specific language governing permissions and limitations
# under the License.
+-server
-Xms1g
-Xmx1g
-Xmn512m
@@ -33,3 +34,6 @@
-XX:GCLogFileSize=50M
-XX:NumberOfGCLogFiles=10
+# solved jdk1.8+ dynamic loading of resources to the classpath issue, if jdk >
1.8, you can enable this parameter
+#--add-opens java.base/jdk.internal.loader=ALL-UNNAMED --add-opens
jdk.zipfs/jdk.nio.zipfs=ALL-UNNAMED
+
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
index 41b9d0628..4bfbaf89b 100755
---
a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
+++
b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
@@ -137,12 +137,6 @@ APP_OUT="$APP_LOG"/streampark.out
# shellcheck disable=SC2034
APP_TMPDIR="$APP_BASE"/temp
-CONFIG="${APP_CONF}/config.yaml"
-if [[ ! -f "$CONFIG" ]] ; then
- echo_r "ERROR: $CONFIG invalid or not found! please check.";
- exit 1;
-fi
-
# Ensure that any user defined CLASSPATH variables are not used on startup,
# but allow them to be specified in setenv.sh, in rare case when it is needed.
CLASSPATH=
@@ -247,13 +241,13 @@ if [ "$USE_NOHUP" = "true" ]; then
NOHUP="nohup"
fi
-PARAM_CLI="org.apache.streampark.flink.core.conf.ParameterCli"
+BASH_UTIL="org.apache.streampark.console.base.util.BashJavaUtils"
APP_MAIN="org.apache.streampark.console.StreamParkConsoleBootstrap"
JVM_OPTS_FILE=${APP_HOME}/bin/jvm_opts.sh
-JVM_ARGS="-server"
+JVM_ARGS=""
if [ -f $JVM_OPTS_FILE ]; then
while read line
do
@@ -284,18 +278,20 @@ print_logo() {
printf ' %s ──────── Apache StreamPark, Make stream processing easier
ô~ô!%s\n\n' $PRIMARY $RESET
}
-read_config() {
- local prop_key=$1
- local value
- while IFS=':' read -r k v; do
- k="${k/[[:space:]]/}"
- v="${v/[[:space:]]/}"
- if [[ ! $k = \#* ]] && [[ $k = $prop_key ]]; then
- value=$v
- break
+init_env() {
+ # shellcheck disable=SC2006
+ CONFIG="${APP_CONF}/application.yml"
+ if [[ -f "$CONFIG" ]] ; then
+ echo_y """[WARN] in the \"conf\" directory, found the \"application.yml\"
file. The \"application.yml\" file is deprecated.
+ For compatibility, this application.yml will be used preferentially.
The latest configuration file is \"config.yaml\". It is recommended to use
\"config.yaml\".
+ Note: \"application.yml\" will be completely deprecated in version
2.2.0. """
+ else
+ CONFIG="${APP_CONF}/config.yaml"
+ if [[ ! -f "$CONFIG" ]] ; then
+ echo_r "can not found config.yaml in \"conf\" directory, please check."
+ exit 1;
fi
- done < "$CONFIG"
- echo "$value"
+ fi
}
# shellcheck disable=SC2120
@@ -316,7 +312,8 @@ get_pid() {
fi
fi
- local serverPort=$(read_config "server.port")
+ # shellcheck disable=SC2006
+ local serverPort=`$_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --yaml "server.port"
"$CONFIG"`
if [ x"${serverPort}" == x"" ]; then
echo_r "server.port is required, please check $CONFIG"
exit 1;
@@ -361,12 +358,14 @@ start() {
echo_w "Using APP_PID: $APP_PID"
fi
- local workspace=$(read_config "streampark.workspace.local")
- if [[ ! -d $workspace ]]; then
- echo_r "ERROR: streampark.workspace.local: \"$workspace\" is invalid path,
Please check $CONFIG"
- echo_r "NOTE: \"streampark.workspace.local\" Do not set under
APP_HOME($APP_HOME). Set it to a secure directory outside of APP_HOME. "
- exit 1;
- fi
+ # shellcheck disable=SC2006
+ local workspace=`$_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --yaml
"streampark.workspace.local" "$CONFIG"`
+ if [[ ! -d $workspace ]]; then
+ echo_r "ERROR: streampark.workspace.local: \"$workspace\" is invalid
path, Please reconfigure in $CONFIG"
+ echo_r "NOTE: \"streampark.workspace.local\" Do not set under
APP_HOME($APP_HOME). Set it to a secure directory outside of APP_HOME. "
+ exit 1;
+ fi
+
if [[ ! -w $workspace ]] || [[ ! -r $workspace ]]; then
echo_r "ERROR: streampark.workspace.local: \"$workspace\" Permission
denied! "
exit 1;
@@ -403,9 +402,7 @@ start() {
# shellcheck disable=SC2034
# shellcheck disable=SC2006
# shellcheck disable=SC2155
- local ADD_OPENS=`$_RUNJAVA -cp "$APP_CLASSPATH" $PARAM_CLI --vmopt`
-
- local JAVA_OPTS="$ADD_OPENS $JVM_OPTS $DEBUG_OPTS"
+ local JAVA_OPTS="$JVM_OPTS $DEBUG_OPTS"
echo_g "JAVA_OPTS: ${JAVA_OPTS}"
@@ -471,14 +468,9 @@ start_docker() {
APP_CLASSPATH+=":${HADOOP_HOME}/etc/hadoop"
fi
- # shellcheck disable=SC2034
- # shellcheck disable=SC2006
- # shellcheck disable=SC2155
- local ADD_OPENS=`$_RUNJAVA -cp "$APP_CLASSPATH" $PARAM_CLI --vmopt`
-
JVM_OPTS="${JVM_OPTS} -XX:-UseContainerSupport"
- local JAVA_OPTS="$ADD_OPENS $JVM_OPTS $DEBUG_OPTS"
+ local JAVA_OPTS="$JVM_OPTS $DEBUG_OPTS"
echo_g "JAVA_OPTS: ${JAVA_OPTS}"
@@ -573,6 +565,7 @@ restart() {
main() {
print_logo
+ init_env
case "$1" in
"debug")
DEBUG_PORT=$2
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/MybatisConfig.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/MybatisConfig.java
index b1798193a..27730f5c5 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/MybatisConfig.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/MybatisConfig.java
@@ -56,7 +56,9 @@ public class MybatisConfig {
* @return {@linkplain PostgreSQLQueryInterceptor}
*/
@Bean
- @ConditionalOnProperty(name = "datasource.dialect", havingValue = "pgsql")
+ @ConditionalOnProperty(
+ name = {"datasource.dialect", "spring.profiles.active"},
+ havingValue = "pgsql")
public PostgreSQLQueryInterceptor postgreSQLQueryInterceptor() {
return new PostgreSQLQueryInterceptor();
}
@@ -67,7 +69,9 @@ public class MybatisConfig {
* @return {@linkplain PostgreSQLPrepareInterceptor}
*/
@Bean
- @ConditionalOnProperty(name = "datasource.dialect", havingValue = "pgsql")
+ @ConditionalOnProperty(
+ name = {"datasource.dialect", "spring.profiles.active"},
+ havingValue = "pgsql")
public PostgreSQLPrepareInterceptor postgreSQLPrepareInterceptor() {
return new PostgreSQLPrepareInterceptor();
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/SpringProperties.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/SpringProperties.java
index 8198c9289..48735f544 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/SpringProperties.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/SpringProperties.java
@@ -18,11 +18,14 @@
package org.apache.streampark.console.base.config;
import org.apache.streampark.common.util.PropertiesUtils;
+import org.apache.streampark.common.util.SystemPropertyUtils;
import org.apache.streampark.console.base.util.WebUtils;
import org.apache.commons.lang3.StringUtils;
import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.InputStream;
@@ -31,23 +34,30 @@ import java.util.Properties;
public class SpringProperties {
- public static Properties get() {
- // 1) get spring config
- Properties springConfig = getSpringConfig();
-
- // 2) get user config
- Properties userConfig = getUserConfig();
-
- // 3) merge config
- mergeConfig(userConfig, springConfig);
-
- // 4) datasource
- dataSourceConfig(userConfig, springConfig);
-
- // 5) system.setProperties
- springConfig.forEach((k, v) -> System.setProperty(k.toString(),
v.toString()));
+ private static final Logger log =
LoggerFactory.getLogger(SpringProperties.class);
- return springConfig;
+ public static Properties get() {
+ File oldConfig = getOldConfig();
+ if (oldConfig != null) {
+ log.warn(
+ "in the \"conf\" directory, found the \"application.yml\" file. The
\"application.yml\" file is deprecated. "
+ + "For compatibility, this \"application.yml\" file will be used
preferentially. The latest configuration file is \"config.yaml\". "
+ + "It is recommended to use \"config.yaml\". Note:
\"application.yml\" will be completely deprecated in version 2.2.0. ");
+ SystemPropertyUtils.set("spring.config.location",
oldConfig.getAbsolutePath());
+ return new Properties();
+ } else {
+ // 1) get spring config
+ Properties springConfig = getSpringConfig();
+ // 2) get user config
+ Properties userConfig = getUserConfig();
+ // 3) merge config
+ mergeConfig(userConfig, springConfig);
+ // 4) datasource
+ dataSourceConfig(userConfig, springConfig);
+ // 5) system.setProperties
+ springConfig.forEach((k, v) -> System.setProperty(k.toString(),
v.toString()));
+ return springConfig;
+ }
}
private static void dataSourceConfig(Properties userConfig, Properties
springConfig) {
@@ -112,12 +122,21 @@ public class SpringProperties {
});
}
+ private static boolean useOldConfig() {
+ String appHome = WebUtils.getAppHome();
+ if (appHome == null) {
+ return false;
+ }
+ File file = new File(appHome + "/conf/application.yml");
+ return file.exists();
+ }
+
private static Properties getUserConfig() {
String appHome = WebUtils.getAppHome();
+ Properties properties = new Properties();
if (appHome != null) {
File file = new File(appHome + "/conf/config.yaml");
if (file.exists() && file.isFile()) {
- Properties properties = new Properties();
Map<String, String> config =
PropertiesUtils.fromYamlFileAsJava(file.getAbsolutePath());
properties.putAll(config);
return properties;
@@ -126,7 +145,6 @@ public class SpringProperties {
} else {
InputStream inputStream =
SpringProperties.class.getClassLoader().getResourceAsStream("config.yaml");
- Properties properties = new Properties();
Map<String, String> config =
PropertiesUtils.fromYamlFileAsJava(inputStream);
properties.putAll(config);
return properties;
@@ -166,4 +184,16 @@ public class SpringProperties {
return config;
}
+
+ private static File getOldConfig() {
+ String appHome = WebUtils.getAppHome();
+ if (appHome == null) {
+ return null;
+ }
+ File file = new File(appHome + "/conf/application.yml");
+ if (file.exists()) {
+ return file;
+ }
+ return null;
+ }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java
new file mode 100644
index 000000000..ec6120cae
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.base.util;
+
+import org.apache.streampark.common.util.PropertiesUtils;
+
+import java.util.Arrays;
+import java.util.Map;
+
+public class BashJavaUtils {
+
+ public static void main(String[] args) {
+ String action = args[0].toLowerCase();
+ String[] actionArgs = Arrays.copyOfRange(args, 1, args.length);
+
+ switch (action) {
+ case "--yaml":
+ String key = actionArgs[0];
+ String conf = actionArgs[1];
+ Map<String, String> confMap = PropertiesUtils.fromYamlFileAsJava(conf);
+ String value = confMap.get(key);
+ System.out.println(value);
+ break;
+ default:
+ break;
+ }
+ }
+}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala
index 1206c817e..400acc47b 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala
@@ -34,90 +34,81 @@ object ParameterCli {
private[this] val optionPrefix = KEY_FLINK_OPTION_PREFIX
private[this] val optionMain = s"$propertyPrefix$$internal.application.main"
- lazy val flinkOptions: Options = FlinkRunOption.allOptions
+ private lazy val flinkOptions: Options = FlinkRunOption.allOptions
lazy val parser = new DefaultParser
def main(args: Array[String]): Unit = print(read(args))
def read(args: Array[String]): String = {
- args(0) match {
- case "--vmopt" =>
- // solved jdk1.8+ dynamic loading of resources to the classpath problem
- ClassLoader.getSystemClassLoader match {
- case c if c.isInstanceOf[URLClassLoader] => ""
+ val action = args(0)
+ val conf = args(1)
+ val map = Try {
+ val extension = conf.split("\\.").last.toLowerCase
+ extension match {
+ case "yml" | "yaml" => PropertiesUtils.fromYamlFile(conf)
+ case "conf" => PropertiesUtils.fromHoconFile(conf)
+ case "properties" => PropertiesUtils.fromPropertiesFile(conf)
+ case _ =>
+ throw new IllegalArgumentException(
+ "[StreamPark] Usage:flink.conf file error,must be
(yml|conf|properties)")
+ }
+ } match {
+ case Success(value) => value
+ case _ => Map.empty[String, String]
+ }
+ val programArgs = args.drop(2)
+ action match {
+ case "--option" =>
+ val option = getOption(map, programArgs)
+ val buffer = new StringBuffer()
+ Try {
+ val line = parser.parse(flinkOptions, option, false)
+ line.getOptions.foreach(
+ x => {
+ buffer.append(s" -${x.getOpt}")
+ if (x.hasArg) {
+ buffer.append(s" ${x.getValue()}")
+ }
+ })
+ } match {
+ case Failure(exception) => exception.printStackTrace()
case _ =>
- "--add-opens java.base/jdk.internal.loader=ALL-UNNAMED --add-opens
jdk.zipfs/jdk.nio.zipfs=ALL-UNNAMED"
}
- case action =>
- val conf = args(1)
- val map = Try {
- val extension = conf.split("\\.").last.toLowerCase
- extension match {
- case "yml" | "yaml" => PropertiesUtils.fromYamlFile(conf)
- case "conf" => PropertiesUtils.fromHoconFile(conf)
- case "properties" => PropertiesUtils.fromPropertiesFile(conf)
- case _ =>
- throw new IllegalArgumentException(
- "[StreamPark] Usage:flink.conf file error,must be
(yml|conf|properties)")
- }
- } match {
- case Success(value) => value
- case _ => Map.empty[String, String]
+ map.getOrElse(optionMain, null) match {
+ case null =>
+ case mainClass => buffer.append(s" -c $mainClass")
}
- val programArgs = args.drop(2)
- action match {
- case "--option" =>
- val option = getOption(map, programArgs)
- val buffer = new StringBuffer()
- Try {
- val line = parser.parse(flinkOptions, option, false)
- line.getOptions.foreach(
- x => {
- buffer.append(s" -${x.getOpt}")
- if (x.hasArg) {
- buffer.append(s" ${x.getValue()}")
- }
- })
- } match {
- case Failure(exception) => exception.printStackTrace()
- case _ =>
- }
- map.getOrElse(optionMain, null) match {
- case null =>
- case mainClass => buffer.append(s" -c $mainClass")
- }
- buffer.toString.trim
- case "--property" =>
- val buffer = new StringBuffer()
- map
- .filter(x => x._1 != optionMain &&
x._1.startsWith(propertyPrefix) && x._2.nonEmpty)
- .foreach {
- x =>
- val key = x._1.drop(propertyPrefix.length).trim
- val value = x._2.trim
- if (key == ConfigConst.KEY_FLINK_APP_NAME) {
- buffer.append(s" -D$key=${value.replace(" ", "_")}")
- } else {
- buffer.append(s" -D$key=$value")
- }
+ buffer.toString.trim
+ case "--property" =>
+ val buffer = new StringBuffer()
+ map
+ .filter(x => x._1 != optionMain && x._1.startsWith(propertyPrefix)
&& x._2.nonEmpty)
+ .foreach {
+ x =>
+ val key = x._1.drop(propertyPrefix.length).trim
+ val value = x._2.trim
+ if (key == ConfigConst.KEY_FLINK_APP_NAME) {
+ buffer.append(s" -D$key=${value.replace(" ", "_")}")
+ } else {
+ buffer.append(s" -D$key=$value")
}
- buffer.toString.trim
- case "--name" =>
-
map.getOrElse(propertyPrefix.concat(ConfigConst.KEY_FLINK_APP_NAME), "").trim
match {
- case appName if appName.nonEmpty => appName
- case _ => ""
- }
- // is detached mode
- case "--detached" =>
- val option = getOption(map, programArgs)
- val line = parser.parse(FlinkRunOption.allOptions, option, false)
- val detached =
line.hasOption(FlinkRunOption.DETACHED_OPTION.getOpt) || line.hasOption(
- FlinkRunOption.DETACHED_OPTION.getLongOpt)
- val mode = if (detached) "Detached" else "Attach"
- mode
- case _ => null
+ }
+ buffer.toString.trim
+ case "--name" =>
+ map.getOrElse(propertyPrefix.concat(ConfigConst.KEY_FLINK_APP_NAME),
"").trim match {
+ case appName if appName.nonEmpty => appName
+ case _ => ""
}
+ // is detached mode
+ case "--detached" =>
+ val option = getOption(map, programArgs)
+ val line = parser.parse(FlinkRunOption.allOptions, option, false)
+ val detached = line.hasOption(FlinkRunOption.DETACHED_OPTION.getOpt)
|| line.hasOption(
+ FlinkRunOption.DETACHED_OPTION.getLongOpt)
+ val mode = if (detached) "Detached" else "Attach"
+ mode
+ case _ => null
}
}