This is an automated email from the ASF dual-hosted git repository.
gongzhongqiang pushed a commit to branch dev-2.1.4
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.4 by this push:
new c83200af8 [Improve] StreamPark Platform configuration files
improvements (#3653)
c83200af8 is described below
commit c83200af822681a0f24db577600d32a088b6c023
Author: benjobs <[email protected]>
AuthorDate: Tue Apr 2 17:47:58 2024 +0800
[Improve] StreamPark Platform configuration files improvements (#3653)
* [Improve] streampark console config improvements
* [Improve] minor improvements
* [Improve] config improvements
* [Improve] minor improve
* [Improve] mybatisConfig improvements
* [Improve] logback config improve
* [Improve] spring.datasource.driver-class-name improvements
* [Improve] PropertiesUtils null value improvement
* [Improve] spring conf improve
* [Improve] config improvements
* [Improve] springTest ci failed fixed
* [Improve] testcase bug fixed.
---------
Co-authored-by: benjobs <[email protected]>
---
.../streampark/common/util/HadoopConfigUtils.scala | 39 ++++-
.../streampark/common/util/HadoopUtils.scala | 61 ++------
.../streampark/common/util/PropertiesUtils.scala | 14 +-
.../apache/streampark/common/util/YarnUtils.scala | 3 +-
.../src/main/assembly/assembly.xml | 5 +-
.../src/main/assembly/bin/streampark.sh | 105 +++++--------
.../console/StreamParkConsoleBootstrap.java | 14 +-
.../console/base/config/MybatisConfig.java | 4 +-
.../console/base/config/SpringProperties.java | 169 +++++++++++++++++++++
.../console/base/properties/ShiroProperties.java | 35 -----
.../console/system/authentication/JWTFilter.java | 16 --
.../console/system/authentication/JWTUtil.java | 37 ++++-
.../system/controller/PassportController.java | 5 +-
.../src/main/resources/application-h2.yml | 30 ----
.../src/main/resources/application-mysql.yml | 23 ---
.../src/main/resources/application-pgsql.yml | 23 ---
.../src/main/resources/application.yml | 149 ------------------
.../src/main/resources/config.yaml | 80 ++++++++++
.../src/main/resources/kerberos.yml | 26 ----
.../resources/{logback-spring.xml => logback.xml} | 76 ++++++---
.../apache/streampark/console/SpringTestBase.java | 29 +++-
.../kubernetes/ingress/IngressStrategyV1.scala | 31 ++--
.../ingress/IngressStrategyV1beta1.scala | 31 ++--
.../watcher/FlinkCheckpointWatcher.scala | 1 -
24 files changed, 510 insertions(+), 496 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala
index 36936e54f..c0c3bc6b9 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala
@@ -16,6 +16,8 @@
*/
package org.apache.streampark.common.util
+import org.apache.streampark.common.conf.{CommonConfig, ConfigConst,
InternalConfigHolder}
+import
org.apache.streampark.common.conf.ConfigConst.{KEY_SECURITY_KERBEROS_DEBUG,
KEY_SECURITY_KERBEROS_ENABLE, KEY_SECURITY_KERBEROS_KEYTAB,
KEY_SECURITY_KERBEROS_KRB5_CONF, KEY_SECURITY_KERBEROS_PRINCIPAL}
import org.apache.streampark.common.fs.LfsOperator
import org.apache.commons.io.{FileUtils => ApacheFileUtils}
@@ -32,10 +34,43 @@ import scala.util.{Failure, Success, Try}
/** Hadoop client configuration tools mainly for flink use. */
object HadoopConfigUtils {
- val HADOOP_CLIENT_CONF_FILES: Array[String] =
+ private[this] lazy val kerberosConf: Map[String, String] = {
+ val map =
System.getProperties.filter(_._1.startsWith("security.kerberos")).toMap
+ map match {
+ case m if m.nonEmpty => m
+ case _ =>
+ val appHome = SystemPropertyUtils.get(ConfigConst.KEY_APP_HOME, null)
+ val file = new File(s"$appHome/conf/kerberos.yml")
+ if (file.exists() && file.isFile) {
+ PropertiesUtils.fromYamlFile(file.getAbsolutePath)
+ } else {
+ Map.empty[String, String]
+ }
+ }
+ }
+
+ lazy val hadoopUserName: String =
+ InternalConfigHolder.get(CommonConfig.STREAMPARK_HADOOP_USER_NAME)
+
+ lazy val kerberosDebug =
+ kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_DEBUG, "false")
+
+ lazy val kerberosEnable =
+ kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_ENABLE, "false").toBoolean
+
+ lazy val kerberosPrincipal =
+ kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_PRINCIPAL, "").trim
+
+ val kerberosKeytab =
+ kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_KEYTAB, "").trim
+
+ val kerberosKrb5 =
+ kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_KRB5_CONF, "")
+
+ private val HADOOP_CLIENT_CONF_FILES: Array[String] =
Array("core-site.xml", "hdfs-site.xml", "yarn-site.xml")
- val HIVE_CLIENT_CONF_FILES: Array[String] =
+ private val HIVE_CLIENT_CONF_FILES: Array[String] =
Array("core-site.xml", "hdfs-site.xml", "hive-site.xml")
/** Get Hadoop configuration directory path from system. */
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
index cd54a4f81..d0417fafe 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.common.util
-import org.apache.streampark.common.conf.{CommonConfig, ConfigConst,
InternalConfigHolder}
+import org.apache.streampark.common.conf.{CommonConfig, InternalConfigHolder}
import org.apache.streampark.common.conf.ConfigConst._
import org.apache.commons.collections.CollectionUtils
@@ -58,47 +58,15 @@ object HadoopUtils extends Logger {
private[this] var tgt: KerberosTicket = _
- lazy val hadoopUserName: String =
- InternalConfigHolder.get(CommonConfig.STREAMPARK_HADOOP_USER_NAME)
-
- private[this] lazy val kerberosConf: Map[String, String] =
- SystemPropertyUtils.get(ConfigConst.KEY_APP_HOME, null) match {
- case null =>
- getClass.getResourceAsStream("/kerberos.yml") match {
- case x if x != null => PropertiesUtils.fromYamlFile(x)
- case _ => null
- }
- case f =>
- val file = new File(s"$f/conf/kerberos.yml")
- if (file.exists() && file.isFile) {
- PropertiesUtils.fromYamlFile(file.getAbsolutePath)
- } else null
- }
-
- private[this] lazy val kerberosDebug =
- kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_DEBUG, "false")
-
- private[this] lazy val kerberosEnable =
- kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_ENABLE, "false").toBoolean
-
- private[this] lazy val kerberosPrincipal =
- kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_PRINCIPAL, "").trim
-
- private[this] lazy val kerberosKeytab =
- kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_KEYTAB, "").trim
-
- private[this] lazy val kerberosKrb5 =
- kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_KRB5_CONF, "")
-
private[this] lazy val configurationCache: util.Map[String, Configuration] =
new ConcurrentHashMap[String, Configuration]()
def getUgi(): UserGroupInformation = {
if (ugi == null) {
- ugi = if (kerberosEnable) {
+ ugi = if (HadoopConfigUtils.kerberosEnable) {
getKerberosUGI()
} else {
- UserGroupInformation.createRemoteUser(hadoopUserName)
+ UserGroupInformation.createRemoteUser(HadoopConfigUtils.hadoopUserName)
}
}
ugi
@@ -199,24 +167,27 @@ object HadoopUtils extends Logger {
logInfo("kerberos login starting....")
require(
- kerberosPrincipal.nonEmpty && kerberosKeytab.nonEmpty,
- s"$KEY_SECURITY_KERBEROS_PRINCIPAL and $KEY_SECURITY_KERBEROS_KEYTAB
must not be empty")
+ HadoopConfigUtils.kerberosPrincipal.nonEmpty &&
HadoopConfigUtils.kerberosKeytab.nonEmpty,
+ s"$KEY_SECURITY_KERBEROS_PRINCIPAL and $KEY_SECURITY_KERBEROS_KEYTAB
must not be empty"
+ )
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false")
- if (kerberosKrb5.nonEmpty) {
- System.setProperty("java.security.krb5.conf", kerberosKrb5)
- System.setProperty("java.security.krb5.conf.path", kerberosKrb5)
+ if (HadoopConfigUtils.kerberosKrb5.nonEmpty) {
+ System.setProperty("java.security.krb5.conf",
HadoopConfigUtils.kerberosKrb5)
+ System.setProperty("java.security.krb5.conf.path",
HadoopConfigUtils.kerberosKrb5)
}
- System.setProperty("sun.security.spnego.debug", kerberosDebug)
- System.setProperty("sun.security.krb5.debug", kerberosDebug)
+ System.setProperty("sun.security.spnego.debug",
HadoopConfigUtils.kerberosDebug)
+ System.setProperty("sun.security.krb5.debug",
HadoopConfigUtils.kerberosDebug)
hadoopConf.set(KEY_HADOOP_SECURITY_AUTHENTICATION, KEY_KERBEROS)
Try {
UserGroupInformation.setConfiguration(hadoopConf)
val ugi =
-
UserGroupInformation.loginUserFromKeytabAndReturnUGI(kerberosPrincipal,
kerberosKeytab)
+ UserGroupInformation.loginUserFromKeytabAndReturnUGI(
+ HadoopConfigUtils.kerberosPrincipal,
+ HadoopConfigUtils.kerberosKeytab)
UserGroupInformation.setLoginUser(ugi)
logInfo("kerberos authentication successful")
ugi
@@ -236,9 +207,7 @@ object HadoopUtils extends Logger {
})
} match {
case Success(fs) =>
- val enableString =
kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_ENABLE, "false")
- val kerberosEnable =
Try(enableString.trim.toBoolean).getOrElse(false)
- if (kerberosEnable) {
+ if (HadoopConfigUtils.kerberosEnable) {
// reLogin...
val timer = new Timer()
timer.schedule(
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index 35facd89f..03aa13145 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -31,6 +31,7 @@ import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
+import scala.util.Try
object PropertiesUtils extends Logger {
@@ -70,13 +71,12 @@ object PropertiesUtils extends Logger {
})
.toMap
case text =>
- val value = text match {
- case null => ""
- case other => other.toString
- }
- prefix match {
- case "" => proper += k -> value
- case other => proper += s"$other.$k" -> value
+ if (text != null) {
+ val value = text.toString.trim
+ prefix match {
+ case "" => proper += k -> value
+ case other => proper += s"$other.$k" -> value
+ }
}
proper.toMap
}
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
index e7b7b04b7..cbfb6e0aa 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
@@ -265,7 +265,8 @@ object YarnUtils extends Logger {
})
} else {
val url =
- if (!hasYarnHttpSimpleAuth) reqUrl else
s"$reqUrl?user.name=${HadoopUtils.hadoopUserName}"
+ if (!hasYarnHttpSimpleAuth) reqUrl
+ else s"$reqUrl?user.name=${HadoopConfigUtils.hadoopUserName}"
HttpClientUtils.httpGetRequest(url, config)
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/assembly.xml
b/streampark-console/streampark-console-service/src/main/assembly/assembly.xml
index 92c73dac4..6d46c1a44 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/assembly.xml
+++
b/streampark-console/streampark-console-service/src/main/assembly/assembly.xml
@@ -95,9 +95,8 @@
<lineEnding>unix</lineEnding>
<fileMode>0755</fileMode>
<includes>
- <include>*.yml</include>
- <include>*.properties</include>
- <include>logback*.xml</include>
+ <include>config.yaml</include>
+ <include>logback.xml</include>
</includes>
</fileSet>
<fileSet>
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
index 807d36ad3..41b9d0628 100755
---
a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
+++
b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
@@ -137,6 +137,12 @@ APP_OUT="$APP_LOG"/streampark.out
# shellcheck disable=SC2034
APP_TMPDIR="$APP_BASE"/temp
+CONFIG="${APP_CONF}/config.yaml"
+if [[ ! -f "$CONFIG" ]] ; then
+ echo_r "ERROR: $CONFIG invalid or not found! please check.";
+ exit 1;
+fi
+
# Ensure that any user defined CLASSPATH variables are not used on startup,
# but allow them to be specified in setenv.sh, in rare case when it is needed.
CLASSPATH=
@@ -278,21 +284,18 @@ print_logo() {
printf ' %s ──────── Apache StreamPark, Make stream processing easier
ô~ô!%s\n\n' $PRIMARY $RESET
}
-parse_yaml() {
- local prefix=$2
- local s='[[:space:]]*' w='[a-zA-Z0-9_]*' fs=$(echo @|tr @ '\034')
- sed -ne "s|^\($s\):|\1|" \
- -e "s|^\($s\)\($w\)$s:$s[\"']\(.*\)[\"']$s\$|\1$fs\2$fs\3|p" \
- -e "s|^\($s\)\($w\)$s:$s\(.*\)$s\$|\1$fs\2$fs\3|p" $1 |
- awk -F$fs '{
- indent = length($1)/2;
- vname[indent] = $2;
- for (i in vname) {if (i > indent) {delete vname[i]}}
- if (length($3) > 0) {
- vn=""; for (i=0; i<indent; i++) {vn=(vn)(vname[i])("_")}
- printf("%s%s%s=\"%s\"\n", "'$prefix'",vn, $2, $3);
- }
- }'
+read_config() {
+ local prop_key=$1
+ local value
+ while IFS=':' read -r k v; do
+ k="${k/[[:space:]]/}"
+ v="${v/[[:space:]]/}"
+ if [[ ! $k = \#* ]] && [[ $k = $prop_key ]]; then
+ value=$v
+ break
+ fi
+ done < "$CONFIG"
+ echo "$value"
}
# shellcheck disable=SC2120
@@ -313,32 +316,25 @@ get_pid() {
fi
fi
- # shellcheck disable=SC2006
- local PROPER="${APP_CONF}/application.yml"
- if [[ ! -f "$PROPER" ]] ; then
- echo_r "ERROR: config file application.yml invalid or not found! ";
+ local serverPort=$(read_config "server.port")
+ if [ x"${serverPort}" == x"" ]; then
+ echo_r "server.port is required, please check $CONFIG"
exit 1;
- fi
-
- # shellcheck disable=SC2046
- eval $(parse_yaml "${PROPER}" "conf_")
- # shellcheck disable=SC2154
- # shellcheck disable=SC2155
- # shellcheck disable=SC2116
- local serverPort=$(echo "$conf_server_port")
- # shellcheck disable=SC2006
- # shellcheck disable=SC2155
- local used=`lsof -i:"$serverPort" | wc -l`
- if [ "$used" -gt 0 ]; then
- # shellcheck disable=SC2006
- local PID=`jps -l | grep "$APP_MAIN" | awk '{print $1}'`
- if [ ! -z $PID ]; then
- echo $PID
- else
- echo 0
- fi
else
- echo 0
+ # shellcheck disable=SC2006
+ # shellcheck disable=SC2155
+ local used=`lsof -i:"$serverPort" | wc -l`
+ if [ "$used" -gt 0 ]; then
+ # shellcheck disable=SC2006
+ local PID=`jps -l | grep "$APP_MAIN" | awk '{print $1}'`
+ if [ ! -z $PID ]; then
+ echo $PID
+ else
+ echo 0
+ fi
+ else
+ echo 0
+ fi
fi
}
@@ -365,22 +361,9 @@ start() {
echo_w "Using APP_PID: $APP_PID"
fi
- local PROPER="${APP_CONF}/application.yml"
- if [[ ! -f "$PROPER" ]] ; then
- echo_r "ERROR: config file application.yml invalid or not found! ";
- exit 1;
- else
- echo_g "Usage: config file: $PROPER ";
- fi
-
- # shellcheck disable=SC2046
- eval $(parse_yaml "${PROPER}" "conf_")
- # shellcheck disable=SC2001
- # shellcheck disable=SC2154
- # shellcheck disable=SC2155
- local workspace=$(echo "$conf_streampark_workspace_local" | sed 's/#.*$//g')
+ local workspace=$(read_config "streampark.workspace.local")
if [[ ! -d $workspace ]]; then
- echo_r "ERROR: streampark.workspace.local: \"$workspace\" is invalid path,
Please reconfigure in application.yml"
+ echo_r "ERROR: streampark.workspace.local: \"$workspace\" is invalid path,
Please check $CONFIG"
echo_r "NOTE: \"streampark.workspace.local\" Do not set under
APP_HOME($APP_HOME). Set it to a secure directory outside of APP_HOME. "
exit 1;
fi
@@ -429,8 +412,7 @@ start() {
eval $NOHUP $_RUNJAVA $JAVA_OPTS \
-classpath "$APP_CLASSPATH" \
-Dapp.home="${APP_HOME}" \
- -Dlogging.config="${APP_CONF}/logback-spring.xml" \
- -Dspring.config.location="${PROPER}" \
+ -Dlogging.config="${APP_CONF}/logback.xml" \
-Djava.io.tmpdir="$APP_TMPDIR" \
$APP_MAIN >> "$APP_OUT" 2>&1 "&"
@@ -462,14 +444,6 @@ start_docker() {
echo_w "Using APP_PID: $APP_PID"
fi
- local PROPER="${APP_CONF}/application.yml"
- if [[ ! -f "$PROPER" ]] ; then
- echo_r "ERROR: config file application.yml invalid or not found! ";
- exit 1;
- else
- echo_g "Usage: config file: $PROPER ";
- fi
-
if [ "${HADOOP_HOME}"x == ""x ]; then
echo_y "WARN: HADOOP_HOME is undefined on your system env,please check it."
else
@@ -511,8 +485,7 @@ start_docker() {
$_RUNJAVA $JAVA_OPTS \
-classpath "$APP_CLASSPATH" \
-Dapp.home="${APP_HOME}" \
- -Dlogging.config="${APP_CONF}/logback-spring.xml" \
- -Dspring.config.location="${PROPER}" \
+ -Dlogging.config="${APP_CONF}/logback.xml" \
-Djava.io.tmpdir="$APP_TMPDIR" \
$APP_MAIN
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java
index a9e3bf316..b54c99013 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java
@@ -17,9 +17,12 @@
package org.apache.streampark.console;
+import org.apache.streampark.console.base.config.SpringProperties;
+
import lombok.extern.slf4j.Slf4j;
-import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.builder.SpringApplicationBuilder;
+import
org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
@@ -44,9 +47,12 @@ import
org.springframework.scheduling.annotation.EnableScheduling;
@Slf4j
@SpringBootApplication
@EnableScheduling
-public class StreamParkConsoleBootstrap {
+public class StreamParkConsoleBootstrap extends SpringBootServletInitializer {
- public static void main(String[] args) {
- SpringApplication.run(StreamParkConsoleBootstrap.class, args);
+ public static void main(String[] args) throws Exception {
+ new SpringApplicationBuilder()
+ .properties(SpringProperties.get())
+ .sources(StreamParkConsoleBootstrap.class)
+ .run(args);
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/MybatisConfig.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/MybatisConfig.java
index 4ac1921e7..b1798193a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/MybatisConfig.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/MybatisConfig.java
@@ -56,7 +56,7 @@ public class MybatisConfig {
* @return {@linkplain PostgreSQLQueryInterceptor}
*/
@Bean
- @ConditionalOnProperty(name = "spring.profiles.active", havingValue =
"pgsql")
+ @ConditionalOnProperty(name = "datasource.dialect", havingValue = "pgsql")
public PostgreSQLQueryInterceptor postgreSQLQueryInterceptor() {
return new PostgreSQLQueryInterceptor();
}
@@ -67,7 +67,7 @@ public class MybatisConfig {
* @return {@linkplain PostgreSQLPrepareInterceptor}
*/
@Bean
- @ConditionalOnProperty(name = "spring.profiles.active", havingValue =
"pgsql")
+ @ConditionalOnProperty(name = "datasource.dialect", havingValue = "pgsql")
public PostgreSQLPrepareInterceptor postgreSQLPrepareInterceptor() {
return new PostgreSQLPrepareInterceptor();
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/SpringProperties.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/SpringProperties.java
new file mode 100644
index 000000000..8198c9289
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/SpringProperties.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.base.config;
+
+import org.apache.streampark.common.util.PropertiesUtils;
+import org.apache.streampark.console.base.util.WebUtils;
+
+import org.apache.commons.lang3.StringUtils;
+
+import com.google.common.collect.Maps;
+
+import java.io.File;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.Properties;
+
+public class SpringProperties {
+
+ public static Properties get() {
+ // 1) get spring config
+ Properties springConfig = getSpringConfig();
+
+ // 2) get user config
+ Properties userConfig = getUserConfig();
+
+ // 3) merge config
+ mergeConfig(userConfig, springConfig);
+
+ // 4) datasource
+ dataSourceConfig(userConfig, springConfig);
+
+ // 5) system.setProperties
+ springConfig.forEach((k, v) -> System.setProperty(k.toString(),
v.toString()));
+
+ return springConfig;
+ }
+
+ private static void dataSourceConfig(Properties userConfig, Properties
springConfig) {
+ String dialect = userConfig.getProperty("datasource.dialect",
"").toString();
+ if (StringUtils.isBlank(dialect)) {
+ throw new ExceptionInInitializerError(
+ "datasource.dialect is required, please check config.yaml");
+ }
+ switch (dialect.toLowerCase()) {
+ case "mysql":
+ try {
+ Class.forName("com.mysql.cj.jdbc.Driver");
+ springConfig.put("spring.datasource.driver-class-name",
"com.mysql.cj.jdbc.Driver");
+ } catch (ClassNotFoundException e) {
+ try {
+ Class.forName("com.mysql.jdbc.Driver");
+ springConfig.put("spring.datasource.driver-class-name",
"com.mysql.jdbc.Driver");
+ } catch (ClassNotFoundException e1) {
+ throw new ExceptionInInitializerError(
+ "datasource.dialect is mysql, \"com.mysql.cj.jdbc.Driver\" and
\"com.mysql.jdbc.Driver\" classes not found, Please ensure that the MySQL
Connector/J can be found under $streampark/lib,\n"
+ + "Notice: The MySQL Connector/J is incompatible with the
Apache 2.0 license, You need to download and put it into $streampark/lib");
+ }
+ }
+ break;
+ case "postgresql":
+ case "pgsql":
+ springConfig.put("spring.datasource.driver-class-name",
"org.postgresql.Driver");
+ break;
+ case "h2":
+ springConfig.put("spring.datasource.driver-class-name",
"org.h2.Driver");
+ springConfig.put("spring.datasource.username", "sa");
+ springConfig.put("spring.datasource.password", "sa");
+ springConfig.put(
+ "spring.datasource.url",
+
"jdbc:h2:mem:streampark;MODE=MySQL;DB_CLOSE_DELAY=-1;DATABASE_TO_LOWER=true;INIT=runscript
from 'classpath:db/schema-h2.sql'");
+ springConfig.put("spring.sql.init.data-locations",
"classpath:db/data-h2.sql");
+ springConfig.put("spring.sql.init.continue-on-error", "true");
+ springConfig.put("spring.sql.init.username", "sa");
+ springConfig.put("spring.sql.init.password", "sa");
+ springConfig.put("spring.sql.init.mode", "always");
+ break;
+ default:
+ throw new UnsupportedOperationException("Unsupported datasource
dialect: " + dialect);
+ }
+ }
+
+ private static void mergeConfig(Properties userConfig, Properties
springConfig) {
+
+ Map<String, String> configMapping = Maps.newHashMap();
+ configMapping.put("datasource.username", "spring.datasource.username");
+ configMapping.put("datasource.password", "spring.datasource.password");
+ configMapping.put("datasource.url", "spring.datasource.url");
+
+ userConfig.forEach(
+ (k, v) -> {
+ String key = configMapping.get(k);
+ if (key != null) {
+ springConfig.put(key, v);
+ } else {
+ springConfig.put(k, v);
+ }
+ });
+ }
+
+ private static Properties getUserConfig() {
+ String appHome = WebUtils.getAppHome();
+ if (appHome != null) {
+ File file = new File(appHome + "/conf/config.yaml");
+ if (file.exists() && file.isFile()) {
+ Properties properties = new Properties();
+ Map<String, String> config =
PropertiesUtils.fromYamlFileAsJava(file.getAbsolutePath());
+ properties.putAll(config);
+ return properties;
+ }
+ throw new ExceptionInInitializerError(file.getAbsolutePath() + " not
found, please check.");
+ } else {
+ InputStream inputStream =
+
SpringProperties.class.getClassLoader().getResourceAsStream("config.yaml");
+ Properties properties = new Properties();
+ Map<String, String> config =
PropertiesUtils.fromYamlFileAsJava(inputStream);
+ properties.putAll(config);
+ return properties;
+ }
+ }
+
+ private static Properties getSpringConfig() {
+ Properties config = new Properties();
+ // basic
+ config.put("spring.application.name", "Apache StreamPark");
+ config.put("spring.main.banner-mode", "false");
+ config.put("spring.devtools.restart.enabled", "false");
+ config.put("spring.aop.proxy-target-class", "true");
+ config.put("spring.messages.encoding", "utf-8");
+ config.put("spring.main.allow-circular-references", "true");
+ config.put("spring.mvc.converters.preferred-json-mapper", "jackson");
+
+ // jackson
+ config.put("spring.jackson.date-format", "yyyy-MM-dd HH:mm:ss");
+ config.put("spring.jackson.time-zone", "GMT+8");
+ config.put("spring.jackson.deserialization.fail-on-unknown-properties",
"false");
+
+ // multipart
+ config.put("spring.servlet.multipart.enabled", "true");
+ config.put("spring.servlet.multipart.max-file-size", "-1");
+ config.put("spring.servlet.multipart.max-request-size", "-1");
+
+ // swagger-ui
+ config.put("knife4j.enable", "false");
+ config.put("knife4j.basic.enable", "false");
+ config.put("springdoc.api-docs.enabled", "false");
+ config.put("knife4j.basic.username", "admin");
+ config.put("knife4j.basic.password", "streampark");
+ config.put("springdoc.swagger-ui.path", "/swagger-ui.html");
+ config.put("springdoc.packages-to-scan", "org.apache.streampark.console");
+ config.put("spring.mvc.pathmatch.matching-strategy", "ant_path_matcher");
+
+ return config;
+ }
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/properties/ShiroProperties.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/properties/ShiroProperties.java
deleted file mode 100644
index f8e2a129b..000000000
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/properties/ShiroProperties.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.console.base.properties;
-
-import lombok.Getter;
-import lombok.Setter;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.context.annotation.Configuration;
-
-@Getter
-@Setter
-@Configuration
-@ConfigurationProperties(prefix = "streampark.shiro")
-public class ShiroProperties {
-
- private String anonUrl;
-
- /** token default effective time: 1d */
- private Long jwtTimeOut = 86400L;
-}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTFilter.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTFilter.java
index 6a597a1ec..9d3b40991 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTFilter.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTFilter.java
@@ -17,16 +17,12 @@
package org.apache.streampark.console.system.authentication;
-import org.apache.streampark.console.base.properties.ShiroProperties;
-import org.apache.streampark.console.base.util.SpringContextUtils;
import org.apache.streampark.console.base.util.WebUtils;
import org.apache.streampark.console.core.enums.AuthenticationType;
-import org.apache.commons.lang3.StringUtils;
import org.apache.shiro.authz.UnauthorizedException;
import org.apache.shiro.web.filter.authc.BasicHttpAuthenticationFilter;
-import com.baomidou.mybatisplus.core.toolkit.StringPool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.util.AntPathMatcher;
@@ -48,18 +44,6 @@ public class JWTFilter extends BasicHttpAuthenticationFilter
{
protected boolean isAccessAllowed(
ServletRequest request, ServletResponse response, Object mappedValue)
throws UnauthorizedException {
- HttpServletRequest httpServletRequest = (HttpServletRequest) request;
- ShiroProperties properties =
SpringContextUtils.getBean(ShiroProperties.class);
- String[] anonUrl =
- StringUtils.splitByWholeSeparatorPreserveAllTokens(
- properties.getAnonUrl(), StringPool.COMMA);
-
- for (String u : anonUrl) {
- if (pathMatcher.match(u.trim(), httpServletRequest.getRequestURI())) {
- return true;
- }
- }
-
if (isLoginAttempt(request, response)) {
return executeLogin(request, response);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java
index 5f89c0f9a..8b033d512 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java
@@ -17,8 +17,6 @@
package org.apache.streampark.console.system.authentication;
-import org.apache.streampark.console.base.properties.ShiroProperties;
-import org.apache.streampark.console.base.util.SpringContextUtils;
import org.apache.streampark.console.core.enums.AuthenticationType;
import com.auth0.jwt.JWT;
@@ -28,12 +26,12 @@ import com.auth0.jwt.interfaces.DecodedJWT;
import lombok.extern.slf4j.Slf4j;
import java.util.Date;
+import java.util.regex.Pattern;
@Slf4j
public class JWTUtil {
- private static final long JWT_TIME_OUT =
- SpringContextUtils.getBean(ShiroProperties.class).getJwtTimeOut() * 1000;
+ private static Long ttlOfSecond;
/**
* verify token
@@ -90,7 +88,9 @@ public class JWTUtil {
*/
public static String sign(
Long userId, String userName, String secret, AuthenticationType
authType) {
- return sign(userId, userName, secret, authType, getExpireTime());
+ Long second = getTTLOfSecond() * 1000;
+ Long ttl = System.currentTimeMillis() + second;
+ return sign(userId, userName, secret, authType, ttl);
}
/**
@@ -113,8 +113,29 @@ public class JWTUtil {
.sign(algorithm);
}
- /** get token expire timestamp */
- private static Long getExpireTime() {
- return System.currentTimeMillis() + JWT_TIME_OUT;
+ public static Long getTTLOfSecond() {
+ if (ttlOfSecond == null) {
+ String ttl = System.getProperty("server.session.ttl", "24h").trim();
+ String regexp = "^\\d+(s|m|h|d)$";
+ Pattern pattern = Pattern.compile(regexp);
+ if (!pattern.matcher(ttl).matches()) {
+ throw new IllegalArgumentException(
+ "server.session.ttl is invalid, Time units must be [s|m|h|d], e.g:
24h, 2d... please check config.yaml ");
+ }
+ String unit = ttl.substring(ttl.length() - 1);
+ String time = ttl.substring(0, ttl.length() - 1);
+ Long second = Long.parseLong(time);
+ switch (unit) {
+ case "m":
+ return ttlOfSecond = second * 60;
+ case "h":
+ return ttlOfSecond = second * 60 * 60;
+ case "d":
+ return ttlOfSecond = second * 24 * 60 * 60;
+ default:
+ return ttlOfSecond = second;
+ }
+ }
+ return ttlOfSecond;
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/PassportController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/PassportController.java
index ee0fc8f39..5334e68e2 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/PassportController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/PassportController.java
@@ -20,7 +20,6 @@ package org.apache.streampark.console.system.controller;
import org.apache.streampark.common.util.DateUtils;
import org.apache.streampark.console.base.domain.ResponseCode;
import org.apache.streampark.console.base.domain.RestResponse;
-import org.apache.streampark.console.base.properties.ShiroProperties;
import org.apache.streampark.console.base.util.WebUtils;
import org.apache.streampark.console.core.enums.AuthenticationType;
import org.apache.streampark.console.system.authentication.JWTToken;
@@ -56,8 +55,6 @@ public class PassportController {
@Autowired private UserService userService;
- @Autowired private ShiroProperties properties;
-
@Autowired private Authenticator authenticator;
@Operation(summary = "Signin")
@@ -95,7 +92,7 @@ public class PassportController {
this.userService.updateLoginTime(username);
String sign = JWTUtil.sign(user.getUserId(), username, user.getSalt(),
AuthenticationType.SIGN);
- LocalDateTime expireTime =
LocalDateTime.now().plusSeconds(properties.getJwtTimeOut());
+ LocalDateTime expireTime =
LocalDateTime.now().plusSeconds(JWTUtil.getTTLOfSecond());
String ttl = DateUtils.formatFullTime(expireTime);
// shiro login
diff --git
a/streampark-console/streampark-console-service/src/main/resources/application-h2.yml
b/streampark-console/streampark-console-service/src/main/resources/application-h2.yml
deleted file mode 100644
index 1a469c8ab..000000000
---
a/streampark-console/streampark-console-service/src/main/resources/application-h2.yml
+++ /dev/null
@@ -1,30 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-spring:
- datasource:
- driver-class-name: org.h2.Driver
- url:
jdbc:h2:mem:streampark;MODE=MySQL;DB_CLOSE_DELAY=-1;DATABASE_TO_LOWER=true;INIT=runscript
from 'classpath:db/schema-h2.sql'
- username: sa
- password: sa
- sql:
- init:
- data-locations: classpath:db/data-h2.sql
- continue-on-error: true
- username: sa
- password: sa
- mode: always
diff --git
a/streampark-console/streampark-console-service/src/main/resources/application-mysql.yml
b/streampark-console/streampark-console-service/src/main/resources/application-mysql.yml
deleted file mode 100644
index 986ed37cf..000000000
---
a/streampark-console/streampark-console-service/src/main/resources/application-mysql.yml
+++ /dev/null
@@ -1,23 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-spring:
- datasource:
- username: root
- password: streampark
- driver-class-name: com.mysql.cj.jdbc.Driver
- url:
jdbc:mysql://localhost:3306/streampark?useUnicode=true&characterEncoding=UTF-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8
diff --git
a/streampark-console/streampark-console-service/src/main/resources/application-pgsql.yml
b/streampark-console/streampark-console-service/src/main/resources/application-pgsql.yml
deleted file mode 100644
index 9c71b81c5..000000000
---
a/streampark-console/streampark-console-service/src/main/resources/application-pgsql.yml
+++ /dev/null
@@ -1,23 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-spring:
- datasource:
- username: postgres
- password: streampark
- driver-class-name: org.postgresql.Driver
- url: jdbc:postgresql://localhost:5432/streampark?stringtype=unspecified
diff --git
a/streampark-console/streampark-console-service/src/main/resources/application.yml
b/streampark-console/streampark-console-service/src/main/resources/application.yml
deleted file mode 100644
index 223fa0ff3..000000000
---
a/streampark-console/streampark-console-service/src/main/resources/application.yml
+++ /dev/null
@@ -1,149 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-server:
- port: 10000
- undertow:
- buffer-size: 1024
- direct-buffers: true
- threads:
- io: 4
- worker: 20
-
-logging:
- level:
- root: info
-
-knife4j:
- enable: true
- basic:
- # basic authentication, used to access swagger-ui and doc
- enable: false
- username: admin
- password: streampark
-
-springdoc:
- api-docs:
- enabled: true
- swagger-ui:
- path: /swagger-ui.html
- packages-to-scan: org.apache.streampark.console
-
-spring:
- profiles.active: h2 #[h2,pgsql,mysql]
- application.name: StreamPark
- devtools.restart.enabled: false
- mvc.pathmatch.matching-strategy: ant_path_matcher
- servlet:
- multipart:
- enabled: true
- max-file-size: 500MB
- max-request-size: 500MB
- aop.proxy-target-class: true
- messages.encoding: utf-8
- jackson:
- date-format: yyyy-MM-dd HH:mm:ss
- time-zone: GMT+8
- deserialization:
- fail-on-unknown-properties: false
- main:
- allow-circular-references: true
- banner-mode: off
- mvc:
- converters:
- preferred-json-mapper: jackson
-
-management:
- endpoints:
- web:
- exposure:
- include: [ 'health', 'httptrace', 'metrics' ]
- endpoint:
- health:
- enabled: true
- show-details: always
- probes:
- enabled: true
- health:
- ldap:
- enabled: false
-
-streampark:
- proxy:
- # knox process address
https://cdpsit02.example.cn:8443/gateway/cdp-proxy/yarn
- yarn-url:
- # lark alert proxy,default https://open.feishu.cn
- lark-url:
- yarn:
- # default simple, or kerberos
- http-auth: simple
-
- # HADOOP_USER_NAME
- hadoop-user-name: hdfs
- # local workspace, used to store source code and build dir etc.
- workspace:
- local: /opt/streampark_workspace
- remote: hdfs:///streampark # support hdfs:///streampark/ 、 /streampark
、hdfs://host:ip/streampark/
-
- # remote docker register namespace for streampark
- docker:
- # instantiating DockerHttpClient
- http-client:
- max-connections: 10000
- connection-timeout-sec: 10000
- response-timeout-sec: 12000
- docker-host: ""
-
- # flink-k8s tracking configuration
- flink-k8s:
- tracking:
- silent-state-keep-sec: 10
- polling-task-timeout-sec:
- job-status: 120
- cluster-metric: 120
- polling-interval-sec:
- job-status: 2
- cluster-metric: 3
- # If you need to specify an ingress controller, you can use this.
- ingress:
- class: nginx
-
- # packer garbage resources collection configuration
- packer-gc:
- # maximum retention time for temporary build resources
- max-resource-expired-hours: 120
- # gc task running interval hours
- exec-cron: 0 0 0/6 * * ?
-
- shiro:
- # token timeout, unit second
- jwtTimeOut: 86400
- # backend authentication-free resources url
- anonUrl: >
-
-ldap:
- # Is ldap enabled? If so, please modify the urls
- enable: false
- ## AD server IP, default port 389
- urls: ldap://99.99.99.99:389
- ## Login Account
- base-dn: dc=streampark,dc=com
- username: cn=Manager,dc=streampark,dc=com
- password: streampark
- user:
- identity-attribute: uid
- email-attribute: mail
diff --git
a/streampark-console/streampark-console-service/src/main/resources/config.yaml
b/streampark-console/streampark-console-service/src/main/resources/config.yaml
new file mode 100644
index 000000000..b52179568
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/resources/config.yaml
@@ -0,0 +1,80 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# logging level
+logging.level.root: info
+# server port
+server.port: 10000
+# The user's login session has a validity period. If it exceeds this time, the
user will be automatically logout
+# unit: s|m|h|d, s: second, m:minute, h:hour, d: day
+server.session.ttl: 2h # unit[s|m|h|d], e.g: 24h, 2d....
+
+# see:
https://github.com/undertow-io/undertow/blob/master/core/src/main/java/io/undertow/Undertow.java
+server.undertow.direct-buffers: true
+server.undertow.buffer-size: 1024
+server.undertow.threads.io: 16
+server.undertow.threads.worker: 256
+
+# system database, default h2, mysql|pgsql|h2
+datasource.dialect: h2 # h2, pgsql
+#if datasource.dialect is mysql or pgsql, you need to configure the following
connection information
+# mysql/postgresql connect access user
+datasource.username:
+# mysql/postgresql connect access password
+datasource.password:
+# mysql/postgresql connection address
+# mysql jdbc url example: datasource.url:
jdbc:mysql://localhost:3306/streampark?useUnicode=true&characterEncoding=UTF-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8
+# postgresql jdbc url example:
jdbc:postgresql://localhost:5432/streampark?stringtype=unspecified
+datasource.url:
+#---------------------------------------------------------------------------------
+
+# Local workspace, storage of clone projects and compiled projects,Do not set
under $APP_HOME. Set it to a directory outside of $APP_HOME.
+streampark.workspace.local: /tmp/streampark
+# The root hdfs path of the jars, Same as yarn.provided.lib.dirs for flink on
yarn-application
+# and Same as --jars for spark on yarn
+streampark.workspace.remote: hdfs:///streampark/
+# hadoop yarn proxy path, e.g: knox process address
https://streampark.com:8443/proxy/yarn
+streampark.proxy.yarn-url:
+# lark proxy address, default https://open.feishu.cn
+streampark.proxy.lark-url:
+# flink on yarn or spark on yarn, monitoring job status from yarn, it is
necessary to set hadoop.http.authentication.type
+streampark.yarn.http-auth: simple # default simple, or kerberos
+# flink on yarn or spark on yarn, it is necessary to set
+streampark.hadoop-user-name: hdfs
+# flink on k8s ingress setting, If an ingress controller is specified in the
configuration, the ingress class
+# kubernetes.io/ingress.class must be specified when creating the ingress,
since there are often
+# multiple ingress controllers in a production environment.
+streampark.flink-k8s.ingress.class: nginx
+
+# sign streampark with ldap.
+ldap.enable: false # ldap enabled
+ldap.urls: ldap://99.99.99.99:389 #AD server IP, default port 389
+ldap.base-dn: dc=streampark,dc=com # Login Account
+ldap.username: cn=Manager,dc=streampark,dc=com
+ldap.password: streampark
+ldap.user.identity-attribute: uid
+ldap.user.email-attribute: mail
+
+# flink on yarn or spark on yarn, when the hadoop cluster enable kerberos
authentication,
+# it is necessary to set up Kerberos authentication related parameters.
+security.kerberos.login.enable: false
+security.kerberos.login.debug: false
+# kerberos principal path
+security.kerberos.login.principal:
+security.kerberos.login.krb5:
+security.kerberos.login.keytab:
+security.kerberos.ttl: 2h # unit [s|m|h|d]
diff --git
a/streampark-console/streampark-console-service/src/main/resources/kerberos.yml
b/streampark-console/streampark-console-service/src/main/resources/kerberos.yml
deleted file mode 100644
index b8199054b..000000000
---
a/streampark-console/streampark-console-service/src/main/resources/kerberos.yml
+++ /dev/null
@@ -1,26 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-security:
- kerberos:
- ttl: 2h # unit [s|m|h|d]
- login:
- enable: false
- debug: false
- principal:
- krb5:
- keytab:
diff --git
a/streampark-console/streampark-console-service/src/main/resources/logback-spring.xml
b/streampark-console/streampark-console-service/src/main/resources/logback.xml
similarity index 55%
rename from
streampark-console/streampark-console-service/src/main/resources/logback-spring.xml
rename to
streampark-console/streampark-console-service/src/main/resources/logback.xml
index 27334797a..c7daf7a0e 100644
---
a/streampark-console/streampark-console-service/src/main/resources/logback-spring.xml
+++
b/streampark-console/streampark-console-service/src/main/resources/logback.xml
@@ -17,33 +17,38 @@
-->
<configuration scan="true" scanPeriod="60 seconds" debug="false">
<contextName>StreamPark</contextName>
- <property name="log.path" value="${app.home}/logs"/>
- <property name="log.maxHistory" value="15"/>
- <property name="log.colorPattern"
+ <property name="LOG_HOME" value="${app.home}/logs"/>
+ <property name="LOG_MAX_HISTORY" value="15"/>
value="%d{yyyy-MM-dd HH:mm:ss} | %highlight(%-5level) |
%boldYellow(%thread) | %boldGreen(%logger):%L] %msg%n"/>
- <property name="log.pattern"
+ <property name="LOG_PATTERN"
value="%d{yyyy-MM-dd HH:mm:ss.SSS} %contextName [%thread]
%-5level %logger{36}:%L - %msg%n"/>
+ <property name="COLOR_PATTERN"
+ value="%d{yyyy-MM-dd HH:mm:ss} | %highlight(%-5level) |
%boldYellow(%thread) | %boldGreen(%logger):%L] %msg%n"/>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
- <pattern>${log.colorPattern}</pattern>
+ <pattern>${COLOR_PATTERN}</pattern>
+ <charset>UTF-8</charset>
</encoder>
</appender>
<appender name="INFO"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<level>INFO</level>
- <onMatch>ACCEPT</onMatch>
- <onMismatch>DENY</onMismatch>
+ <onMatch>DENY</onMatch>
+ <onMismatch>ACCEPT</onMismatch>
</filter>
+ <file>${LOG_HOME}/info.log</file>
+ <rollingPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+
<fileNamePattern>${LOG_HOME}/info.%d{yyyy-MM-dd}_%i.log</fileNamePattern>
+ <maxHistory>${LOG_MAX_HISTORY}</maxHistory>
+ <maxFileSize>50MB</maxFileSize>
+ <totalSizeCap>1GB</totalSizeCap>
+ </rollingPolicy>
<encoder>
+ <pattern>${LOG_PATTERN}</pattern>
<charset>UTF-8</charset>
- <pattern>${log.pattern}</pattern>
</encoder>
- <rollingPolicy
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-
<fileNamePattern>${log.path}/info.%d{yyyy-MM-dd}.log</fileNamePattern>
- <MaxHistory>${log.maxHistory}</MaxHistory>
- </rollingPolicy>
</appender>
<appender name="ERROR"
class="ch.qos.logback.core.rolling.RollingFileAppender">
@@ -52,12 +57,16 @@
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
- <rollingPolicy
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-
<fileNamePattern>${log.path}/error.%d{yyyy-MM-dd}.log</fileNamePattern>
+ <file>${LOG_HOME}/error.log</file>
+ <rollingPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+
<fileNamePattern>${LOG_HOME}/error.%d{yyyy-MM-dd}_%i.log</fileNamePattern>
+ <maxHistory>${LOG_MAX_HISTORY}</maxHistory>
+ <maxFileSize>50MB</maxFileSize>
+ <totalSizeCap>1GB</totalSizeCap>
</rollingPolicy>
<encoder>
+ <pattern>${LOG_PATTERN}</pattern>
<charset>UTF-8</charset>
- <pattern>${log.pattern}</pattern>
</encoder>
</appender>
@@ -67,12 +76,16 @@
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
- <rollingPolicy
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-
<fileNamePattern>${log.path}/warn.%d{yyyy-MM-dd}.log</fileNamePattern>
+ <file>${LOG_HOME}/warn.log</file>
+ <rollingPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+
<fileNamePattern>${LOG_HOME}/warn.%d{yyyy-MM-dd}_%i.log</fileNamePattern>
+ <maxHistory>${LOG_MAX_HISTORY}</maxHistory>
+ <maxFileSize>50MB</maxFileSize>
+ <totalSizeCap>1GB</totalSizeCap>
</rollingPolicy>
<encoder>
+ <pattern>${LOG_PATTERN}</pattern>
<charset>UTF-8</charset>
- <pattern>${log.pattern}</pattern>
</encoder>
</appender>
@@ -82,12 +95,16 @@
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
- <rollingPolicy
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-
<fileNamePattern>${log.path}/debug.%d{yyyy-MM-dd}.log</fileNamePattern>
+ <file>${LOG_HOME}/debug.log</file>
+ <rollingPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+
<fileNamePattern>${LOG_HOME}/debug.%d{yyyy-MM-dd}_%i.log</fileNamePattern>
+ <maxHistory>${LOG_MAX_HISTORY}</maxHistory>
+ <maxFileSize>50MB</maxFileSize>
+ <totalSizeCap>1GB</totalSizeCap>
</rollingPolicy>
<encoder>
+ <pattern>${LOG_PATTERN}</pattern>
<charset>UTF-8</charset>
- <pattern>${log.pattern}</pattern>
</encoder>
</appender>
@@ -97,15 +114,26 @@
<onMatch>ACCEPT</onMatch>
<onMismatch>DENY</onMismatch>
</filter>
- <rollingPolicy
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-
<fileNamePattern>${log.path}/trace.%d{yyyy-MM-dd}.log</fileNamePattern>
+ <file>${LOG_HOME}/trace.log</file>
+ <rollingPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+
<fileNamePattern>${LOG_HOME}/trace.%d{yyyy-MM-dd}_%i.log</fileNamePattern>
+ <maxHistory>${LOG_MAX_HISTORY}</maxHistory>
+ <maxFileSize>50MB</maxFileSize>
+ <totalSizeCap>1GB</totalSizeCap>
</rollingPolicy>
<encoder>
+ <pattern>${LOG_PATTERN}</pattern>
<charset>UTF-8</charset>
- <pattern>${log.pattern}</pattern>
</encoder>
</appender>
+ <!-- log level -->
+ <logger name="org.springframework.web" level="INFO"/>
+ <logger name="com.apache.ibatis" level="TRACE"/>
+ <logger name="java.sql.Connection" level="DEBUG"/>
+ <logger name="java.sql.Statement" level="DEBUG"/>
+ <logger name="java.sql.PreparedStatement" level="DEBUG"/>
+
<root level="INFO">
<appender-ref ref="STDOUT"/>
<appender-ref ref="INFO"/>
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringTestBase.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringTestBase.java
index 30043f3e9..8ca6eeb06 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringTestBase.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringTestBase.java
@@ -34,7 +34,6 @@ import
org.springframework.boot.test.autoconfigure.orm.jpa.AutoConfigureTestEnti
import
org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
-import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.transaction.annotation.Transactional;
@@ -46,14 +45,36 @@ import java.util.Date;
/** base tester. */
@Transactional
-@ActiveProfiles("test")
@AutoConfigureTestEntityManager
@AutoConfigureWebTestClient(timeout = "60000")
-@TestPropertySource(locations = {"classpath:application-test.yml"})
@ExtendWith({MockitoExtension.class, SpringExtension.class})
+@ActiveProfiles("test")
@SpringBootTest(
classes = StreamParkConsoleBootstrap.class,
- webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
+ webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT,
+ properties = {
+ "server.port=10000",
+ "spring.application.name=Apache StreamPark",
+ "spring.main.banner-mode=false",
+ "spring.aop.proxy-target-class=true",
+ "spring.messages.encoding=utf-8",
+ "spring.main.allow-circular-references=true",
+ "spring.mvc.converters.preferred-json-mapper=jackson",
+ "spring.jackson.date-format=yyyy-MM-dd HH:mm:ss",
+ "spring.jackson.time-zone=GMT+8",
+ "spring.jackson.deserialization.fail-on-unknown-properties=false",
+ "spring.mvc.pathmatch.matching-strategy=ant_path_matcher",
+ "datasource.dialect=h2",
+ "spring.datasource.driver-class-name=org.h2.Driver",
+ "spring.datasource.username=sa",
+ "spring.datasource.password=sa",
+
"spring.datasource.url=jdbc:h2:mem:streampark;MODE=MySQL;DB_CLOSE_DELAY=-1;DATABASE_TO_LOWER=true;INIT=runscript
from 'classpath:db/schema-h2.sql'",
+ "spring.sql.init.data-locations=classpath:db/data-h2.sql",
+ "spring.sql.init.continue-on-error=true",
+ "spring.sql.init.username=sa",
+ "spring.sql.init.password=sa",
+ "spring.sql.init.mode=always"
+ })
public abstract class SpringTestBase {
protected static final Logger LOG =
LoggerFactory.getLogger(SpringTestBase.class);
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
index 1ac15bfdc..a54216d6a 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
@@ -23,9 +23,10 @@ import
io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder
import io.fabric8.kubernetes.client.DefaultKubernetesClient
import org.apache.flink.client.program.ClusterClient
+import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.language.postfixOps
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
class IngressStrategyV1 extends IngressStrategy {
@@ -35,17 +36,25 @@ class IngressStrategyV1 extends IngressStrategy {
Utils.using(new DefaultKubernetesClient) {
client =>
Try {
-
Option(client.network.v1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
- .map(ingress => ingress.getSpec.getRules.get(0))
- .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
- .map { case (host, path) => s"http://$host$path" }
- .getOrElse {
- Utils.using(clusterClient)(client => client.getWebInterfaceURL)
- }
- }.recover {
- case e =>
+ val ingress =
+
Try(client.network.v1.ingresses().inNamespace(nameSpace).withName(clusterId).get())
+ .getOrElse(null)
+ if (ingress == null) {
+ Utils.using(clusterClient)(client => client.getWebInterfaceURL)
+ } else {
+ Option(ingress)
+ .map(ingress => ingress.getSpec.getRules.head)
+ .map(rule => rule.getHost -> rule.getHttp.getPaths.head.getPath)
+ .map { case (host, path) => s"http://$host$path" }
+ .getOrElse {
+ Utils.using(clusterClient)(client => client.getWebInterfaceURL)
+ }
+ }
+ } match {
+ case Success(value) => value
+ case Failure(e) =>
throw new RuntimeException(s"[StreamPark] get ingressUrlAddress
error: $e")
- }.get
+ }
}
}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
index 408210a0c..20ffdab8d 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
@@ -25,9 +25,10 @@ import io.fabric8.kubernetes.client.DefaultKubernetesClient
import org.apache.commons.lang3.StringUtils
import org.apache.flink.client.program.ClusterClient
+import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.language.postfixOps
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
class IngressStrategyV1beta1 extends IngressStrategy {
@@ -37,17 +38,25 @@ class IngressStrategyV1beta1 extends IngressStrategy {
Utils.using(new DefaultKubernetesClient) {
client =>
Try {
-
Option(client.network.v1beta1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
- .map(ingress => ingress.getSpec.getRules.get(0))
- .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
- .map { case (host, path) => s"http://$host$path" }
- .getOrElse {
- Utils.using(clusterClient)(client => client.getWebInterfaceURL)
- }
- }.recover {
- case e =>
+ val ingress =
+
Try(client.network.v1beta1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
+ .getOrElse(null)
+ if (ingress == null) {
+ Utils.using(clusterClient)(client => client.getWebInterfaceURL)
+ } else {
+ Option(ingress)
+ .map(ingress => ingress.getSpec.getRules.head)
+ .map(rule => rule.getHost -> rule.getHttp.getPaths.head.getPath)
+ .map { case (host, path) => s"http://$host$path" }
+ .getOrElse {
+ Utils.using(clusterClient)(client => client.getWebInterfaceURL)
+ }
+ }
+ } match {
+ case Success(value) => value
+ case Failure(e) =>
throw new RuntimeException(s"[StreamPark] get ingressUrlAddress
error: $e")
- }.get
+ }
}
}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
index 0b344d764..6335379e2 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
@@ -23,7 +23,6 @@ import
org.apache.streampark.flink.kubernetes.event.FlinkJobCheckpointChangeEven
import org.apache.streampark.flink.kubernetes.model.{CheckpointCV, ClusterKey,
TrackId}
import org.apache.hc.client5.http.fluent.Request
-import org.apache.hc.core5.util.Timeout
import org.json4s.{DefaultFormats, JNull}
import org.json4s.JsonAST.JNothing
import org.json4s.jackson.JsonMethods.parse