This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch conf in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 80ef114c2719ede6890227faf4a12a25d29a9092 Author: benjobs <[email protected]> AuthorDate: Sat Mar 30 12:42:09 2024 +0800 [Improve] streampark console config improvements --- .../streampark/common/util/HadoopConfigUtils.scala | 39 +++++- .../streampark/common/util/HadoopUtils.scala | 61 ++------- .../apache/streampark/common/util/YarnUtils.scala | 3 +- .../src/main/assembly/bin/streampark.sh | 2 - .../console/StreamParkConsoleBootstrap.java | 16 ++- .../console/base/config/SpringProperties.java | 151 +++++++++++++++++++++ .../console/base/properties/ShiroProperties.java | 35 ----- .../console/system/authentication/JWTFilter.java | 16 --- .../console/system/authentication/JWTUtil.java | 37 +++-- .../system/controller/PassportController.java | 5 +- .../src/main/resources/application-h2.yml | 30 ---- .../src/main/resources/application-mysql.yml | 23 ---- .../src/main/resources/application-pgsql.yml | 23 ---- .../src/main/resources/application.yml | 149 -------------------- .../src/main/resources/config.yaml | 68 ++++++++++ .../kubernetes/ingress/IngressStrategyV1.scala | 31 +++-- .../ingress/IngressStrategyV1beta1.scala | 31 +++-- 17 files changed, 355 insertions(+), 365 deletions(-) diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala index 36936e54f..c0c3bc6b9 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala @@ -16,6 +16,8 @@ */ package org.apache.streampark.common.util +import org.apache.streampark.common.conf.{CommonConfig, ConfigConst, InternalConfigHolder} +import org.apache.streampark.common.conf.ConfigConst.{KEY_SECURITY_KERBEROS_DEBUG, KEY_SECURITY_KERBEROS_ENABLE, KEY_SECURITY_KERBEROS_KEYTAB, KEY_SECURITY_KERBEROS_KRB5_CONF, KEY_SECURITY_KERBEROS_PRINCIPAL} import org.apache.streampark.common.fs.LfsOperator import org.apache.commons.io.{FileUtils => ApacheFileUtils} @@ -32,10 +34,43 @@ import scala.util.{Failure, Success, Try} /** Hadoop client configuration tools mainly for flink use. */ object HadoopConfigUtils { - val HADOOP_CLIENT_CONF_FILES: Array[String] = + private[this] lazy val kerberosConf: Map[String, String] = { + val map = System.getProperties.filter(_._1.startsWith("security.kerberos")).toMap + map match { + case m if m.nonEmpty => m + case _ => + val appHome = SystemPropertyUtils.get(ConfigConst.KEY_APP_HOME, null) + val file = new File(s"$appHome/conf/kerberos.yml") + if (file.exists() && file.isFile) { + PropertiesUtils.fromYamlFile(file.getAbsolutePath) + } else { + Map.empty[String, String] + } + } + } + + lazy val hadoopUserName: String = + InternalConfigHolder.get(CommonConfig.STREAMPARK_HADOOP_USER_NAME) + + lazy val kerberosDebug = + kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_DEBUG, "false") + + lazy val kerberosEnable = + kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_ENABLE, "false").toBoolean + + lazy val kerberosPrincipal = + kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_PRINCIPAL, "").trim + + val kerberosKeytab = + kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_KEYTAB, "").trim + + val kerberosKrb5 = + kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_KRB5_CONF, "") + + private val HADOOP_CLIENT_CONF_FILES: Array[String] = Array("core-site.xml", "hdfs-site.xml", "yarn-site.xml") - val HIVE_CLIENT_CONF_FILES: Array[String] = + private val HIVE_CLIENT_CONF_FILES: Array[String] = Array("core-site.xml", "hdfs-site.xml", "hive-site.xml") /** Get Hadoop configuration directory path from system. */ diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala index cd54a4f81..d0417fafe 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala @@ -17,7 +17,7 @@ package org.apache.streampark.common.util -import org.apache.streampark.common.conf.{CommonConfig, ConfigConst, InternalConfigHolder} +import org.apache.streampark.common.conf.{CommonConfig, InternalConfigHolder} import org.apache.streampark.common.conf.ConfigConst._ import org.apache.commons.collections.CollectionUtils @@ -58,47 +58,15 @@ object HadoopUtils extends Logger { private[this] var tgt: KerberosTicket = _ - lazy val hadoopUserName: String = - InternalConfigHolder.get(CommonConfig.STREAMPARK_HADOOP_USER_NAME) - - private[this] lazy val kerberosConf: Map[String, String] = - SystemPropertyUtils.get(ConfigConst.KEY_APP_HOME, null) match { - case null => - getClass.getResourceAsStream("/kerberos.yml") match { - case x if x != null => PropertiesUtils.fromYamlFile(x) - case _ => null - } - case f => - val file = new File(s"$f/conf/kerberos.yml") - if (file.exists() && file.isFile) { - PropertiesUtils.fromYamlFile(file.getAbsolutePath) - } else null - } - - private[this] lazy val kerberosDebug = - kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_DEBUG, "false") - - private[this] lazy val kerberosEnable = - kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_ENABLE, "false").toBoolean - - private[this] lazy val kerberosPrincipal = - kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_PRINCIPAL, "").trim - - private[this] lazy val kerberosKeytab = - kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_KEYTAB, "").trim - - private[this] lazy val kerberosKrb5 = - kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_KRB5_CONF, "") - private[this] lazy val configurationCache: util.Map[String, Configuration] = new ConcurrentHashMap[String, Configuration]() def getUgi(): UserGroupInformation = { if (ugi == null) { - ugi = if (kerberosEnable) { + ugi = if (HadoopConfigUtils.kerberosEnable) { getKerberosUGI() } else { - UserGroupInformation.createRemoteUser(hadoopUserName) + UserGroupInformation.createRemoteUser(HadoopConfigUtils.hadoopUserName) } } ugi @@ -199,24 +167,27 @@ object HadoopUtils extends Logger { logInfo("kerberos login starting....") require( - kerberosPrincipal.nonEmpty && kerberosKeytab.nonEmpty, - s"$KEY_SECURITY_KERBEROS_PRINCIPAL and $KEY_SECURITY_KERBEROS_KEYTAB must not be empty") + HadoopConfigUtils.kerberosPrincipal.nonEmpty && HadoopConfigUtils.kerberosKeytab.nonEmpty, + s"$KEY_SECURITY_KERBEROS_PRINCIPAL and $KEY_SECURITY_KERBEROS_KEYTAB must not be empty" + ) System.setProperty("javax.security.auth.useSubjectCredsOnly", "false") - if (kerberosKrb5.nonEmpty) { - System.setProperty("java.security.krb5.conf", kerberosKrb5) - System.setProperty("java.security.krb5.conf.path", kerberosKrb5) + if (HadoopConfigUtils.kerberosKrb5.nonEmpty) { + System.setProperty("java.security.krb5.conf", HadoopConfigUtils.kerberosKrb5) + System.setProperty("java.security.krb5.conf.path", HadoopConfigUtils.kerberosKrb5) } - System.setProperty("sun.security.spnego.debug", kerberosDebug) - System.setProperty("sun.security.krb5.debug", kerberosDebug) + System.setProperty("sun.security.spnego.debug", HadoopConfigUtils.kerberosDebug) + System.setProperty("sun.security.krb5.debug", HadoopConfigUtils.kerberosDebug) hadoopConf.set(KEY_HADOOP_SECURITY_AUTHENTICATION, KEY_KERBEROS) Try { UserGroupInformation.setConfiguration(hadoopConf) val ugi = - UserGroupInformation.loginUserFromKeytabAndReturnUGI(kerberosPrincipal, kerberosKeytab) + UserGroupInformation.loginUserFromKeytabAndReturnUGI( + HadoopConfigUtils.kerberosPrincipal, + HadoopConfigUtils.kerberosKeytab) UserGroupInformation.setLoginUser(ugi) logInfo("kerberos authentication successful") ugi @@ -236,9 +207,7 @@ object HadoopUtils extends Logger { }) } match { case Success(fs) => - val enableString = kerberosConf.getOrElse(KEY_SECURITY_KERBEROS_ENABLE, "false") - val kerberosEnable = Try(enableString.trim.toBoolean).getOrElse(false) - if (kerberosEnable) { + if (HadoopConfigUtils.kerberosEnable) { // reLogin... val timer = new Timer() timer.schedule( diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala index e7b7b04b7..cbfb6e0aa 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala @@ -265,7 +265,8 @@ object YarnUtils extends Logger { }) } else { val url = - if (!hasYarnHttpSimpleAuth) reqUrl else s"$reqUrl?user.name=${HadoopUtils.hadoopUserName}" + if (!hasYarnHttpSimpleAuth) reqUrl + else s"$reqUrl?user.name=${HadoopConfigUtils.hadoopUserName}" HttpClientUtils.httpGetRequest(url, config) } } diff --git a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh index 807d36ad3..90ac0471e 100755 --- a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh +++ b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh @@ -511,8 +511,6 @@ start_docker() { $_RUNJAVA $JAVA_OPTS \ -classpath "$APP_CLASSPATH" \ -Dapp.home="${APP_HOME}" \ - -Dlogging.config="${APP_CONF}/logback-spring.xml" \ - -Dspring.config.location="${PROPER}" \ -Djava.io.tmpdir="$APP_TMPDIR" \ $APP_MAIN diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java index a9e3bf316..21c17e6d0 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/StreamParkConsoleBootstrap.java @@ -17,11 +17,16 @@ package org.apache.streampark.console; +import org.apache.streampark.console.base.config.SpringProperties; + import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.boot.web.servlet.support.SpringBootServletInitializer; import org.springframework.scheduling.annotation.EnableScheduling; +import java.util.Map; + /** * * @@ -44,9 +49,12 @@ import org.springframework.scheduling.annotation.EnableScheduling; @Slf4j @SpringBootApplication @EnableScheduling -public class StreamParkConsoleBootstrap { +public class StreamParkConsoleBootstrap extends SpringBootServletInitializer { - public static void main(String[] args) { - SpringApplication.run(StreamParkConsoleBootstrap.class, args); + public static void main(String[] args) throws Exception { + Map<String, Object> properties = SpringProperties.getProperties(); + properties.forEach((k, v) -> System.setProperty(k, v.toString())); + SpringApplicationBuilder builder = new SpringApplicationBuilder().properties(properties); + builder.sources(StreamParkConsoleBootstrap.class).run(args); } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/SpringProperties.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/SpringProperties.java new file mode 100644 index 000000000..eff97497c --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/SpringProperties.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.streampark.console.base.config; + +import org.apache.streampark.common.conf.ConfigConst; +import org.apache.streampark.common.util.PropertiesUtils; +import org.apache.streampark.common.util.SystemPropertyUtils; + +import org.apache.commons.lang3.StringUtils; + +import com.google.common.collect.Maps; + +import java.io.File; +import java.io.InputStream; +import java.util.Map; + +public class SpringProperties { + + public static Map<String, Object> getProperties() { + // 1) get spring config + Map<String, Object> springConfig = getSpringConfig(); + + // 2) get user config + Map<String, String> userConfig = getUserConfig(); + + // 3) merge config + mergeConfig(userConfig, springConfig); + + // 4) datasource + dataSourceConfig(userConfig, springConfig); + + return springConfig; + } + + private static void dataSourceConfig( + Map<String, String> userConfig, Map<String, Object> springConfig) { + + String dialect = userConfig.remove("datasource.dialect"); + if (StringUtils.isBlank(dialect)) { + throw new ExceptionInInitializerError( + "datasource.dialect is required, please check config.yaml"); + } + switch (dialect.toLowerCase()) { + case "mysql": + springConfig.put("spring.datasource.driver-class-name", "com.mysql.cj.jdbc.Driver"); + break; + case "pgsql": + springConfig.put("spring.datasource.driver-class-name", "org.postgresql.Driver"); + break; + case "h2": + springConfig.put("spring.datasource.driver-class-name", "org.h2.Driver"); + springConfig.put("spring.datasource.username", "sa"); + springConfig.put("spring.datasource.password", "sa"); + springConfig.put( + "spring.datasource.url", + "jdbc:h2:mem:streampark;MODE=MySQL;DB_CLOSE_DELAY=-1;DATABASE_TO_LOWER=true;INIT=runscript from 'classpath:db/schema-h2.sql'"); + springConfig.put("spring.sql.init.data-locations", "classpath:db/data-h2.sql"); + springConfig.put("spring.sql.init.continue-on-error", "true"); + springConfig.put("spring.sql.init.username", "sa"); + springConfig.put("spring.sql.init.password", "sa"); + springConfig.put("spring.sql.init.mode", "always"); + break; + default: + throw new UnsupportedOperationException("Unsupported datasource dialect: " + dialect); + } + } + + private static void mergeConfig( + Map<String, String> userConfig, Map<String, Object> springConfig) { + + Map<String, String> configMapping = Maps.newHashMap(); + configMapping.put("datasource.username", "spring.datasource.username"); + configMapping.put("datasource.password", "spring.datasource.password"); + configMapping.put("datasource.url", "spring.datasource.url"); + + userConfig.forEach( + (k, v) -> { + if (StringUtils.isNotBlank(v)) { + String key = configMapping.get(k); + if (key != null) { + springConfig.put(key, v); + } else { + springConfig.put(k, v); + } + } + }); + } + + private static Map<String, String> getUserConfig() { + String appHome = SystemPropertyUtils.get(ConfigConst.KEY_APP_HOME(), null); + if (appHome != null) { + File file = new File(appHome + "/conf/config.yaml"); + if (file.exists() && file.isFile()) { + return PropertiesUtils.fromYamlFileAsJava(file.getAbsolutePath()); + } + } else { + InputStream inputStream = + SpringProperties.class.getClassLoader().getResourceAsStream("config.yaml"); + return PropertiesUtils.fromYamlFileAsJava(inputStream); + } + throw new ExceptionInInitializerError("config.yaml not found"); + } + + private static Map<String, Object> getSpringConfig() { + Map<String, Object> config = Maps.newHashMap(); + // env + config.put("spring.devtools.restart.enabled", "false"); + config.put("spring.aop.proxy-target-class", "true"); + config.put("spring.messages.encoding", "utf-8"); + config.put("spring.main.allow-circular-references", "true"); + config.put("spring.main.banner-mode", "false"); + config.put("spring.application.name", "StreamPark"); + config.put("spring.mvc.converters.preferred-json-mapper", "jackson"); + + // jackson + config.put("spring.jackson.date-format", "yyyy-MM-dd HH:mm:ss"); + config.put("spring.jackson.time-zone", "GMT+8"); + config.put("spring.jackson.deserialization.fail-on-unknown-properties", "false"); + + // multipart + config.put("spring.servlet.multipart.enabled", "true"); + config.put("spring.servlet.multipart.max-file-size", "-1"); + config.put("spring.servlet.multipart.max-request-size", "-1"); + + // swagger-ui + config.put("knife4j.enable", "false"); + config.put("knife4j.basic.enable", "false"); + config.put("springdoc.api-docs.enabled", "false"); + config.put("knife4j.basic.username", "admin"); + config.put("knife4j.basic.password", "streampark"); + config.put("springdoc.swagger-ui.path", "/swagger-ui.html"); + config.put("springdoc.packages-to-scan", "org.apache.streampark.console"); + config.put("spring.mvc.pathmatch.matching-strategy", "ant_path_matcher"); + + return config; + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/properties/ShiroProperties.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/properties/ShiroProperties.java deleted file mode 100644 index f8e2a129b..000000000 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/properties/ShiroProperties.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.streampark.console.base.properties; - -import lombok.Getter; -import lombok.Setter; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.context.annotation.Configuration; - -@Getter -@Setter -@Configuration -@ConfigurationProperties(prefix = "streampark.shiro") -public class ShiroProperties { - - private String anonUrl; - - /** token default effective time: 1d */ - private Long jwtTimeOut = 86400L; -} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTFilter.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTFilter.java index 6a597a1ec..9d3b40991 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTFilter.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTFilter.java @@ -17,16 +17,12 @@ package org.apache.streampark.console.system.authentication; -import org.apache.streampark.console.base.properties.ShiroProperties; -import org.apache.streampark.console.base.util.SpringContextUtils; import org.apache.streampark.console.base.util.WebUtils; import org.apache.streampark.console.core.enums.AuthenticationType; -import org.apache.commons.lang3.StringUtils; import org.apache.shiro.authz.UnauthorizedException; import org.apache.shiro.web.filter.authc.BasicHttpAuthenticationFilter; -import com.baomidou.mybatisplus.core.toolkit.StringPool; import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpStatus; import org.springframework.util.AntPathMatcher; @@ -48,18 +44,6 @@ public class JWTFilter extends BasicHttpAuthenticationFilter { protected boolean isAccessAllowed( ServletRequest request, ServletResponse response, Object mappedValue) throws UnauthorizedException { - HttpServletRequest httpServletRequest = (HttpServletRequest) request; - ShiroProperties properties = SpringContextUtils.getBean(ShiroProperties.class); - String[] anonUrl = - StringUtils.splitByWholeSeparatorPreserveAllTokens( - properties.getAnonUrl(), StringPool.COMMA); - - for (String u : anonUrl) { - if (pathMatcher.match(u.trim(), httpServletRequest.getRequestURI())) { - return true; - } - } - if (isLoginAttempt(request, response)) { return executeLogin(request, response); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java index 5f89c0f9a..8975a69a5 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/authentication/JWTUtil.java @@ -17,8 +17,6 @@ package org.apache.streampark.console.system.authentication; -import org.apache.streampark.console.base.properties.ShiroProperties; -import org.apache.streampark.console.base.util.SpringContextUtils; import org.apache.streampark.console.core.enums.AuthenticationType; import com.auth0.jwt.JWT; @@ -28,12 +26,12 @@ import com.auth0.jwt.interfaces.DecodedJWT; import lombok.extern.slf4j.Slf4j; import java.util.Date; +import java.util.regex.Pattern; @Slf4j public class JWTUtil { - private static final long JWT_TIME_OUT = - SpringContextUtils.getBean(ShiroProperties.class).getJwtTimeOut() * 1000; + private static Long TTL_SECOND; /** * verify token @@ -90,7 +88,9 @@ public class JWTUtil { */ public static String sign( Long userId, String userName, String secret, AuthenticationType authType) { - return sign(userId, userName, secret, authType, getExpireTime()); + Long second = getTTLOfSecond() * 1000; + Long ttl = System.currentTimeMillis() + second; + return sign(userId, userName, secret, authType, ttl); } /** @@ -113,8 +113,29 @@ public class JWTUtil { .sign(algorithm); } - /** get token expire timestamp */ - private static Long getExpireTime() { - return System.currentTimeMillis() + JWT_TIME_OUT; + public static Long getTTLOfSecond() { + if (TTL_SECOND == null) { + String ttl = System.getProperty("server.session.ttl", "24h").trim(); + String regexp = "^\\d+(s|m|h|d)$"; + Pattern pattern = Pattern.compile(regexp); + if (!pattern.matcher(ttl).matches()) { + throw new IllegalArgumentException( + "server.session.ttl is invalid, Time units must be [s|m|h|d], e.g: 24h, 2d... please check config.yaml "); + } + String unit = ttl.substring(ttl.length() - 1); + String time = ttl.substring(0, ttl.length() - 1); + Long second = Long.parseLong(time); + switch (unit) { + case "m": + return TTL_SECOND = second * 60; + case "h": + return TTL_SECOND = second * 60 * 60; + case "d": + return TTL_SECOND = second * 24 * 60 * 60; + default: + return TTL_SECOND = second; + } + } + return TTL_SECOND; } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/PassportController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/PassportController.java index ee0fc8f39..5334e68e2 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/PassportController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/controller/PassportController.java @@ -20,7 +20,6 @@ package org.apache.streampark.console.system.controller; import org.apache.streampark.common.util.DateUtils; import org.apache.streampark.console.base.domain.ResponseCode; import org.apache.streampark.console.base.domain.RestResponse; -import org.apache.streampark.console.base.properties.ShiroProperties; import org.apache.streampark.console.base.util.WebUtils; import org.apache.streampark.console.core.enums.AuthenticationType; import org.apache.streampark.console.system.authentication.JWTToken; @@ -56,8 +55,6 @@ public class PassportController { @Autowired private UserService userService; - @Autowired private ShiroProperties properties; - @Autowired private Authenticator authenticator; @Operation(summary = "Signin") @@ -95,7 +92,7 @@ public class PassportController { this.userService.updateLoginTime(username); String sign = JWTUtil.sign(user.getUserId(), username, user.getSalt(), AuthenticationType.SIGN); - LocalDateTime expireTime = LocalDateTime.now().plusSeconds(properties.getJwtTimeOut()); + LocalDateTime expireTime = LocalDateTime.now().plusSeconds(JWTUtil.getTTLOfSecond()); String ttl = DateUtils.formatFullTime(expireTime); // shiro login diff --git a/streampark-console/streampark-console-service/src/main/resources/application-h2.yml b/streampark-console/streampark-console-service/src/main/resources/application-h2.yml deleted file mode 100644 index 1a469c8ab..000000000 --- a/streampark-console/streampark-console-service/src/main/resources/application-h2.yml +++ /dev/null @@ -1,30 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -spring: - datasource: - driver-class-name: org.h2.Driver - url: jdbc:h2:mem:streampark;MODE=MySQL;DB_CLOSE_DELAY=-1;DATABASE_TO_LOWER=true;INIT=runscript from 'classpath:db/schema-h2.sql' - username: sa - password: sa - sql: - init: - data-locations: classpath:db/data-h2.sql - continue-on-error: true - username: sa - password: sa - mode: always diff --git a/streampark-console/streampark-console-service/src/main/resources/application-mysql.yml b/streampark-console/streampark-console-service/src/main/resources/application-mysql.yml deleted file mode 100644 index 986ed37cf..000000000 --- a/streampark-console/streampark-console-service/src/main/resources/application-mysql.yml +++ /dev/null @@ -1,23 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -spring: - datasource: - username: root - password: streampark - driver-class-name: com.mysql.cj.jdbc.Driver - url: jdbc:mysql://localhost:3306/streampark?useUnicode=true&characterEncoding=UTF-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8 diff --git a/streampark-console/streampark-console-service/src/main/resources/application-pgsql.yml b/streampark-console/streampark-console-service/src/main/resources/application-pgsql.yml deleted file mode 100644 index 9c71b81c5..000000000 --- a/streampark-console/streampark-console-service/src/main/resources/application-pgsql.yml +++ /dev/null @@ -1,23 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -spring: - datasource: - username: postgres - password: streampark - driver-class-name: org.postgresql.Driver - url: jdbc:postgresql://localhost:5432/streampark?stringtype=unspecified diff --git a/streampark-console/streampark-console-service/src/main/resources/application.yml b/streampark-console/streampark-console-service/src/main/resources/application.yml deleted file mode 100644 index 223fa0ff3..000000000 --- a/streampark-console/streampark-console-service/src/main/resources/application.yml +++ /dev/null @@ -1,149 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -server: - port: 10000 - undertow: - buffer-size: 1024 - direct-buffers: true - threads: - io: 4 - worker: 20 - -logging: - level: - root: info - -knife4j: - enable: true - basic: - # basic authentication, used to access swagger-ui and doc - enable: false - username: admin - password: streampark - -springdoc: - api-docs: - enabled: true - swagger-ui: - path: /swagger-ui.html - packages-to-scan: org.apache.streampark.console - -spring: - profiles.active: h2 #[h2,pgsql,mysql] - application.name: StreamPark - devtools.restart.enabled: false - mvc.pathmatch.matching-strategy: ant_path_matcher - servlet: - multipart: - enabled: true - max-file-size: 500MB - max-request-size: 500MB - aop.proxy-target-class: true - messages.encoding: utf-8 - jackson: - date-format: yyyy-MM-dd HH:mm:ss - time-zone: GMT+8 - deserialization: - fail-on-unknown-properties: false - main: - allow-circular-references: true - banner-mode: off - mvc: - converters: - preferred-json-mapper: jackson - -management: - endpoints: - web: - exposure: - include: [ 'health', 'httptrace', 'metrics' ] - endpoint: - health: - enabled: true - show-details: always - probes: - enabled: true - health: - ldap: - enabled: false - -streampark: - proxy: - # knox process address https://cdpsit02.example.cn:8443/gateway/cdp-proxy/yarn - yarn-url: - # lark alert proxy,default https://open.feishu.cn - lark-url: - yarn: - # default simple, or kerberos - http-auth: simple - - # HADOOP_USER_NAME - hadoop-user-name: hdfs - # local workspace, used to store source code and build dir etc. - workspace: - local: /opt/streampark_workspace - remote: hdfs:///streampark # support hdfs:///streampark/ 、 /streampark 、hdfs://host:ip/streampark/ - - # remote docker register namespace for streampark - docker: - # instantiating DockerHttpClient - http-client: - max-connections: 10000 - connection-timeout-sec: 10000 - response-timeout-sec: 12000 - docker-host: "" - - # flink-k8s tracking configuration - flink-k8s: - tracking: - silent-state-keep-sec: 10 - polling-task-timeout-sec: - job-status: 120 - cluster-metric: 120 - polling-interval-sec: - job-status: 2 - cluster-metric: 3 - # If you need to specify an ingress controller, you can use this. - ingress: - class: nginx - - # packer garbage resources collection configuration - packer-gc: - # maximum retention time for temporary build resources - max-resource-expired-hours: 120 - # gc task running interval hours - exec-cron: 0 0 0/6 * * ? - - shiro: - # token timeout, unit second - jwtTimeOut: 86400 - # backend authentication-free resources url - anonUrl: > - -ldap: - # Is ldap enabled? If so, please modify the urls - enable: false - ## AD server IP, default port 389 - urls: ldap://99.99.99.99:389 - ## Login Account - base-dn: dc=streampark,dc=com - username: cn=Manager,dc=streampark,dc=com - password: streampark - user: - identity-attribute: uid - email-attribute: mail diff --git a/streampark-console/streampark-console-service/src/main/resources/config.yaml b/streampark-console/streampark-console-service/src/main/resources/config.yaml new file mode 100644 index 000000000..6a2995a21 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/resources/config.yaml @@ -0,0 +1,68 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# logging level +logging.level.root: info +# server port +server.port: 10000 +# see: https://github.com/undertow-io/undertow/blob/master/core/src/main/java/io/undertow/Undertow.java +server.undertow.io-threads: 16 + +# system database, default h2, mysql|pgsql|h2 +datasource.dialect: h2 # h2, pgsql +# if datasource.dialect is mysql or pgsql, it is necessary to set... +datasource.username: root +datasource.password: streampark +# datasource.url: jdbc:postgresql://localhost:5432/streampark?stringtype=unspecified +datasource.url: jdbc:mysql://localhost:3306/streampark?useUnicode=true&characterEncoding=UTF-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8 + +# Directory for storing locally built project +streampark.workspace.local: /tmp/streampark +# The root hdfs path of the jars, Same as yarn.provided.lib.dirs for flink on yarn-application +# and Same as --jars for spark on yarn +streampark.workspace.remote: hdfs:///streampark/ +# hadoop yarn proxy path, e.g: knox process address https://streampark.com:8443/proxy/yarn +streampark.proxy.yarn-url: +# lark proxy address, default https://open.feishu.cn +streampark.proxy.lark-url: +# flink on k8s ingress setting, If an ingress controller is specified in the configuration, the ingress class +# kubernetes.io/ingress.class must be specified when creating the ingress, since there are often +# multiple ingress controllers in a production environment. +streampark.k8s.ingress.class: nginx +# flink on yarn or spark on yarn, monitoring job status from yarn, it is necessary to set hadoop.http.authentication.type +streampark.yarn.http-auth: simple # default simple, or kerberos +# flink on yarn or spark on yarn, it is necessary to set +streampark.hadoop-user-name: hdfs + +# sign streampark with ldap. +ldap.enable: false # Is ldap enabled? If so, please modify the urls +ldap.urls: ldap://99.99.99.99:389 #AD server IP, default port 389 +ldap.base-dn: dc=streampark,dc=com # Login Account +ldap.username: cn=Manager,dc=streampark,dc=com +ldap.password: streampark +ldap.user.identity-attribute: uid +ldap.user.email-attribute: mail + +# flink on yarn or spark on yarn, when the hadoop cluster enable kerberos authentication, +# it is necessary to set up Kerberos authentication related parameters. +security.kerberos.login.enable: false +security.kerberos.login.debug: false +# kerberos principal path +security.kerberos.login.principal: +security.kerberos.login.krb5: +security.kerberos.login.keytab: +security.kerberos.ttl: 2h # unit [s|m|h|d] diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala index 1ac15bfdc..a54216d6a 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala @@ -23,9 +23,10 @@ import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder import io.fabric8.kubernetes.client.DefaultKubernetesClient import org.apache.flink.client.program.ClusterClient +import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ import scala.language.postfixOps -import scala.util.Try +import scala.util.{Failure, Success, Try} class IngressStrategyV1 extends IngressStrategy { @@ -35,17 +36,25 @@ class IngressStrategyV1 extends IngressStrategy { Utils.using(new DefaultKubernetesClient) { client => Try { - Option(client.network.v1.ingresses.inNamespace(nameSpace).withName(clusterId).get) - .map(ingress => ingress.getSpec.getRules.get(0)) - .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath) - .map { case (host, path) => s"http://$host$path" } - .getOrElse { - Utils.using(clusterClient)(client => client.getWebInterfaceURL) - } - }.recover { - case e => + val ingress = + Try(client.network.v1.ingresses().inNamespace(nameSpace).withName(clusterId).get()) + .getOrElse(null) + if (ingress == null) { + Utils.using(clusterClient)(client => client.getWebInterfaceURL) + } else { + Option(ingress) + .map(ingress => ingress.getSpec.getRules.head) + .map(rule => rule.getHost -> rule.getHttp.getPaths.head.getPath) + .map { case (host, path) => s"http://$host$path" } + .getOrElse { + Utils.using(clusterClient)(client => client.getWebInterfaceURL) + } + } + } match { + case Success(value) => value + case Failure(e) => throw new RuntimeException(s"[StreamPark] get ingressUrlAddress error: $e") - }.get + } } } diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala index 408210a0c..20ffdab8d 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala @@ -25,9 +25,10 @@ import io.fabric8.kubernetes.client.DefaultKubernetesClient import org.apache.commons.lang3.StringUtils import org.apache.flink.client.program.ClusterClient +import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ import scala.language.postfixOps -import scala.util.Try +import scala.util.{Failure, Success, Try} class IngressStrategyV1beta1 extends IngressStrategy { @@ -37,17 +38,25 @@ class IngressStrategyV1beta1 extends IngressStrategy { Utils.using(new DefaultKubernetesClient) { client => Try { - Option(client.network.v1beta1.ingresses.inNamespace(nameSpace).withName(clusterId).get) - .map(ingress => ingress.getSpec.getRules.get(0)) - .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath) - .map { case (host, path) => s"http://$host$path" } - .getOrElse { - Utils.using(clusterClient)(client => client.getWebInterfaceURL) - } - }.recover { - case e => + val ingress = + Try(client.network.v1beta1.ingresses.inNamespace(nameSpace).withName(clusterId).get) + .getOrElse(null) + if (ingress == null) { + Utils.using(clusterClient)(client => client.getWebInterfaceURL) + } else { + Option(ingress) + .map(ingress => ingress.getSpec.getRules.head) + .map(rule => rule.getHost -> rule.getHttp.getPaths.head.getPath) + .map { case (host, path) => s"http://$host$path" } + .getOrElse { + Utils.using(clusterClient)(client => client.getWebInterfaceURL) + } + } + } match { + case Success(value) => value + case Failure(e) => throw new RuntimeException(s"[StreamPark] get ingressUrlAddress error: $e") - }.get + } } }
