This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch quickstart in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 6486e343c90b480dbdf3166a55bbe3ff63089139 Author: benjobs <[email protected]> AuthorDate: Tue Jun 11 17:51:39 2024 +0800 [Feat] quick install script support. --- .../conf/config.yaml | 88 ++++++++ .../streampark/common/util/PropertiesUtils.scala | 82 +++++-- .../src/main/assembly/bin/streampark.sh | 56 ++--- .../console/base/util/BashJavaUtils.java | 75 ++++++- .../ApplicationBuildPipelineController.java | 52 +---- .../core/controller/FlinkClusterController.java | 5 +- .../console/core/entity/Application.java | 2 +- .../console/core/runner/QuickStartRunner.java | 97 ++++++++ .../console/core/service/ApplicationService.java | 5 +- .../console/core/service/FlinkClusterService.java | 2 +- .../core/service/impl/ApplicationServiceImpl.java | 50 +++++ .../core/service/impl/FlinkClusterServiceImpl.java | 4 +- .../console/system/runner/StartedUpRunner.java | 19 +- streampark.sh | 248 +++++++++++++++++++++ 14 files changed, 671 insertions(+), 114 deletions(-) diff --git a/apache-streampark_2.12-2.1.4-incubating-bin/conf/config.yaml b/apache-streampark_2.12-2.1.4-incubating-bin/conf/config.yaml new file mode 100644 index 000000000..c6845eaad --- /dev/null +++ b/apache-streampark_2.12-2.1.4-incubating-bin/conf/config.yaml @@ -0,0 +1,88 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +logging: + level: + root: info + +server: + port: 10000 + session: + # The user's login session has a validity period. If it exceeds this time, the user will be automatically logout + # unit: s|m|h|d, s: second, m:minute, h:hour, d: day + ttl: 2h # unit[s|m|h|d], e.g: 24h, 2d.... + undertow: # see: https://github.com/undertow-io/undertow/blob/master/core/src/main/java/io/undertow/Undertow.java + buffer-size: 1024 + direct-buffers: true + threads: + io: 16 + worker: 256 + +# system database, default h2, mysql|pgsql|h2 +datasource: + dialect: h2 #h2, mysql, pgsql + # if datasource.dialect is mysql or pgsql, you need to configure the following connection information + # mysql/postgresql connect user + username: + # mysql/postgresql connect password + password: + # mysql/postgresql connect jdbcURL + # mysql example: datasource.url: jdbc:mysql://localhost:3306/streampark?useUnicode=true&characterEncoding=UTF-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8 + # postgresql example: jdbc:postgresql://localhost:5432/streampark?stringtype=unspecified + url: + +streampark: + workspace: + # Local workspace, storage directory of clone projects and compiled projects,Do not set under $APP_HOME. Set it to a directory outside of $APP_HOME. + local: /tmp/streampark + # The root hdfs path of the jars, Same as yarn.provided.lib.dirs for flink on yarn-application and Same as --jars for spark on yarn + remote: hdfs:///streampark/ + proxy: + # lark proxy address, default https://open.feishu.cn + lark-url: + # hadoop yarn proxy path, e.g: knox process address https://streampark.com:8443/proxy/yarn + yarn-url: + yarn: + # flink on yarn or spark on yarn, monitoring job status from yarn, it is necessary to set hadoop.http.authentication.type + http-auth: 'simple' # default simple, or kerberos + # flink on yarn or spark on yarn, HADOOP_USER_NAME + hadoop-user-name: hdfs + project: + # Number of projects allowed to be running at the same time , If there is no limit, -1 can be configured + max-build: 16 + +# flink on yarn or spark on yarn, when the hadoop cluster enable kerberos authentication, it is necessary to set Kerberos authentication parameters. +security: + kerberos: + login: + debug: false + enable: false + keytab: + krb5: + principal: + ttl: 2h # unit [s|m|h|d] + +# sign streampark with ldap. +ldap: + base-dn: dc=streampark,dc=com # Login Account + enable: false # ldap enabled' + username: cn=Manager,dc=streampark,dc=com + password: streampark + urls: ldap://99.99.99.99:389 #AD server IP, default port 389 + user: + email-attribute: mail + identity-attribute: uid diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala index 03aa13145..6adc03cf7 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala @@ -31,7 +31,6 @@ import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, Map => MutableMap} -import scala.util.Try object PropertiesUtils extends Logger { @@ -308,39 +307,76 @@ object PropertiesUtils extends Logger { @Nonnull def extractArguments(args: String): List[String] = { val programArgs = new ArrayBuffer[String]() if (StringUtils.isNotEmpty(args)) { - val array = args.split("\\s+") - val iter = array.iterator - while (iter.hasNext) { - val v = iter.next() - val p = v.take(1) - p match { - case "'" | "\"" => - var value = v - if (!v.endsWith(p)) { - while (!value.endsWith(p) && iter.hasNext) { - value += s" ${iter.next()}" - } + return extractArguments(args.split("\\s+")) + } + programArgs.toList + } + + def extractArguments(array: Array[String]): List[String] = { + val programArgs = new ArrayBuffer[String]() + val iter = array.iterator + while (iter.hasNext) { + val v = iter.next() + val p = v.take(1) + p match { + case "'" | "\"" => + var value = v + if (!v.endsWith(p)) { + while (!value.endsWith(p) && iter.hasNext) { + value += s" ${iter.next()}" } - programArgs += value.substring(1, value.length - 1) - case _ => - val regexp = "(.*)='(.*)'$" + } + programArgs += value.substring(1, value.length - 1) + case _ => + val regexp = "(.*)='(.*)'$" + if (v.matches(regexp)) { + programArgs += v.replaceAll(regexp, "$1=$2") + } else { + val regexp = "(.*)=\"(.*)\"$" if (v.matches(regexp)) { programArgs += v.replaceAll(regexp, "$1=$2") } else { - val regexp = "(.*)=\"(.*)\"$" - if (v.matches(regexp)) { - programArgs += v.replaceAll(regexp, "$1=$2") - } else { - programArgs += v - } + programArgs += v } - } + } } } programArgs.toList } + def extractMultipleArguments(array: Array[String]): Map[String, Map[String, String]] = { + val iter = array.iterator + val map = mutable.Map[String, mutable.Map[String, String]]() + while (iter.hasNext) { + val v = iter.next() + v.take(2) match { + case "--" => + val kv = iter.next() + val regexp = "(.*)=(.*)" + if (kv.matches(regexp)) { + val values = kv.split("=") + val k1 = values(0).trim + val v1 = values(1).replaceAll("^['|\"]|['|\"]$", "") + val k = v.drop(2) + map.get(k) match { + case Some(m) => m += k1 -> v1 + case _ => map += k -> mutable.Map(k1 -> v1) + } + } + case _ => + } + } + map.map(x => x._1 -> x._2.toMap).toMap + } + @Nonnull def extractDynamicPropertiesAsJava(properties: String): JavaMap[String, String] = new JavaMap[String, String](extractDynamicProperties(properties).asJava) + @Nonnull def extractMultipleArgumentsAsJava( + args: Array[String]): JavaMap[String, JavaMap[String, String]] = { + val map = + extractMultipleArguments(args).map(c => c._1 -> new JavaMap[String, String](c._2.asJava)) + new JavaMap[String, JavaMap[String, String]](map.asJava) + } + } diff --git a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh index 81af07693..5e0e550ce 100755 --- a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh +++ b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh @@ -241,10 +241,23 @@ if [[ "$USE_NOHUP" = "true" ]]; then NOHUP="nohup" fi -BASH_UTIL="org.apache.streampark.console.base.util.BashJavaUtils" +CONFIG="${APP_CONF}/application.yml" +# shellcheck disable=SC2006 +if [[ -f "$CONFIG" ]] ; then + echo_y """[WARN] in the \"conf\" directory, found the \"application.yml\" file. The \"application.yml\" file is deprecated. + For compatibility, this application.yml will be used preferentially. The latest configuration file is \"config.yaml\". It is recommended to use \"config.yaml\". + Note: \"application.yml\" will be completely deprecated in version 2.2.0. """ +else + CONFIG="${APP_CONF}/config.yaml" + if [[ ! -f "$CONFIG" ]] ; then + echo_r "can not found config.yaml in \"conf\" directory, please check." + exit 1; + fi +fi +BASH_UTIL="org.apache.streampark.console.base.util.BashJavaUtils" APP_MAIN="org.apache.streampark.console.StreamParkConsoleBootstrap" - +SERVER_PORT=$($_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --get_yaml "server.port" "$CONFIG") JVM_OPTS_FILE=${APP_HOME}/bin/jvm_opts.sh JVM_ARGS="" @@ -276,21 +289,8 @@ print_logo() { printf ' %s WebSite: https://streampark.apache.org%s\n' $BLUE $RESET printf ' %s GitHub : http://github.com/apache/streampark%s\n\n' $BLUE $RESET printf ' %s ──────── Apache StreamPark, Make stream processing easier ô~ô!%s\n\n' $PRIMARY $RESET -} - -init_env() { - # shellcheck disable=SC2006 - CONFIG="${APP_CONF}/application.yml" - if [[ -f "$CONFIG" ]] ; then - echo_y """[WARN] in the \"conf\" directory, found the \"application.yml\" file. The \"application.yml\" file is deprecated. - For compatibility, this application.yml will be used preferentially. The latest configuration file is \"config.yaml\". It is recommended to use \"config.yaml\". - Note: \"application.yml\" will be completely deprecated in version 2.2.0. """ - else - CONFIG="${APP_CONF}/config.yaml" - if [[ ! -f "$CONFIG" ]] ; then - echo_r "can not found config.yaml in \"conf\" directory, please check." - exit 1; - fi + if [[ "$1"x == "start"x ]]; then + printf ' %s http://localhost:%s %s\n\n' $PRIMARY $SERVER_PORT $RESET fi } @@ -313,19 +313,19 @@ get_pid() { fi # shellcheck disable=SC2006 - local serverPort=`$_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --get_yaml "server.port" "$CONFIG"` - if [[ x"${serverPort}" == x"" ]]; then + if [[ "${SERVER_PORT}"x == ""x ]]; then echo_r "server.port is required, please check $CONFIG" exit 1; else # shellcheck disable=SC2006 # shellcheck disable=SC2155 - local used=`$_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --check_port "$serverPort"` - if [[ x"${used}" == x"used" ]]; then + local used=`$_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --check_port "$SERVER_PORT"` + if [[ "${used}"x == "used"x ]]; then # shellcheck disable=SC2006 local PID=`jps -l | grep "$APP_MAIN" | awk '{print $1}'` + # shellcheck disable=SC2236 if [[ ! -z $PID ]]; then - echo $PID + echo "$PID" else echo 0 fi @@ -411,7 +411,7 @@ start() { -Dapp.home="${APP_HOME}" \ -Dlogging.config="${APP_CONF}/logback-spring.xml" \ -Djava.io.tmpdir="$APP_TMPDIR" \ - $APP_MAIN >> "$APP_OUT" 2>&1 "&" + $APP_MAIN "$@" >> "$APP_OUT" 2>&1 "&" local PID=$! local IS_NUMBER="^[0-9]+$" @@ -565,27 +565,31 @@ restart() { } main() { - print_logo - init_env case "$1" in "debug") DEBUG_PORT=$2 debug ;; "start") - start + shift + start "$@" + [[ $? -eq 0 ]] && print_logo "start" ;; "start_docker") + print_logo start_docker ;; "stop") + print_logo stop ;; "status") + print_logo status ;; "restart") restart + [[ $? -eq 0 ]] && print_logo "start" ;; *) echo_r "Unknown command: $1" diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java index faf8b8ead..53492b436 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java @@ -17,15 +17,24 @@ package org.apache.streampark.console.base.util; +import org.apache.streampark.common.conf.FlinkVersion; +import org.apache.streampark.common.util.FileUtils; import org.apache.streampark.common.util.PropertiesUtils; +import org.apache.commons.io.output.NullOutputStream; + +import java.io.File; +import java.io.FileWriter; import java.io.IOException; -import java.net.ServerSocket; +import java.io.PrintStream; +import java.net.Socket; import java.util.Arrays; import java.util.Map; public class BashJavaUtils { + private static String localhost = "localhost"; + public static void main(String[] args) throws IOException { String action = args[0].toLowerCase(); String[] actionArgs = Arrays.copyOfRange(args, 1, args.length); @@ -39,12 +48,66 @@ public class BashJavaUtils { System.out.println(value); break; case "--check_port": - Integer port = Integer.parseInt(actionArgs[0]); - try { - new ServerSocket(port); - System.out.println("free"); - } catch (Exception e) { + int port = Integer.parseInt(actionArgs[0]); + try (Socket ignored = new Socket(localhost, port)) { System.out.println("used"); + } catch (Exception e) { + System.out.println("free"); + } + break; + case "--free_port": + int start = Integer.parseInt(actionArgs[0]); + for (port = start; port < 65535; port++) { + try (Socket ignored = new Socket(localhost, port)) { + } catch (Exception e) { + System.out.println(port); + break; + } + } + break; + case "--read_flink": + String input = actionArgs[0]; + String[] inputs = input.split(":"); + String flinkDist = + Arrays.stream(inputs).filter(c -> c.contains("flink-dist-")).findFirst().get(); + File flinkHome = new File(flinkDist.replaceAll("/lib/.*", "")); + FlinkVersion flinkVersion = new FlinkVersion(flinkHome.getAbsolutePath()); + + PrintStream originalOut = System.out; + System.setOut(new PrintStream(new NullOutputStream())); + + String version = flinkVersion.majorVersion(); + float ver = Float.parseFloat(version); + File yaml = + new File(flinkHome, ver < 1.19f ? "/conf/flink-conf.yaml" : "/conf/config.yaml"); + + Map<String, String> config = PropertiesUtils.fromYamlFileAsJava(yaml.getAbsolutePath()); + String flinkPort = config.getOrDefault("rest.port", "8081"); + System.setOut(originalOut); + System.out.println( + flinkHome + .getAbsolutePath() + .concat(",") + .concat(flinkHome.getName()) + .concat(",") + .concat(flinkPort)); + break; + case "--replace": + String filePath = actionArgs[0]; + String[] text = actionArgs[1].split("\\|\\|"); + String searchText = text[0]; + String replaceText = text[1]; + try { + File file = new File(filePath); + String content = FileUtils.readString(file); + content = content.replace(searchText, replaceText); + FileWriter writer = new FileWriter(filePath); + writer.write(content); + writer.flush(); + writer.close(); + System.exit(0); + } catch (IOException e) { + System.exit(1); } break; default: diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java index b86a63cfb..5b2361baa 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java @@ -18,13 +18,9 @@ package org.apache.streampark.console.core.controller; import org.apache.streampark.console.base.domain.RestResponse; -import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.core.annotation.PermissionScope; import org.apache.streampark.console.core.bean.AppBuildDockerResolvedDetail; import org.apache.streampark.console.core.entity.AppBuildPipeline; -import org.apache.streampark.console.core.entity.Application; -import org.apache.streampark.console.core.entity.ApplicationLog; -import org.apache.streampark.console.core.entity.FlinkEnv; import org.apache.streampark.console.core.service.AppBuildPipeService; import org.apache.streampark.console.core.service.ApplicationLogService; import org.apache.streampark.console.core.service.ApplicationService; @@ -42,7 +38,6 @@ import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; -import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -74,52 +69,7 @@ public class ApplicationBuildPipelineController { @PostMapping(value = "build") @RequiresPermissions("app:create") public RestResponse buildApplication(Long appId, boolean forceBuild) throws Exception { - Application app = applicationService.getById(appId); - - ApiAlertException.throwIfNull( - app.getVersionId(), "Please bind a Flink version to the current flink job."); - // 1) check flink version - FlinkEnv env = flinkEnvService.getById(app.getVersionId()); - boolean checkVersion = env.getFlinkVersion().checkVersion(false); - if (!checkVersion) { - throw new ApiAlertException("Unsupported flink version: " + env.getFlinkVersion().version()); - } - - // 2) check env - boolean envOk = applicationService.checkEnv(app); - if (!envOk) { - throw new ApiAlertException( - "Check flink env failed, please check the flink version of this job"); - } - - if (!forceBuild && !appBuildPipeService.allowToBuildNow(appId)) { - throw new ApiAlertException( - "The job is invalid, or the job cannot be built while it is running"); - } - // check if you need to go through the build process (if the jar and pom have changed, - // you need to go through the build process, if other common parameters are modified, - // you don't need to go through the build process) - - ApplicationLog applicationLog = new ApplicationLog(); - applicationLog.setOptionName( - org.apache.streampark.console.core.enums.Operation.RELEASE.getValue()); - applicationLog.setAppId(app.getId()); - applicationLog.setOptionTime(new Date()); - - boolean needBuild = applicationService.checkBuildAndUpdate(app); - if (!needBuild) { - applicationLog.setSuccess(true); - applicationLogService.save(applicationLog); - return RestResponse.success(true); - } - - // rollback - if (app.isNeedRollback() && app.isFlinkSqlJob()) { - flinkSqlService.rollback(app); - } - - boolean actionResult = appBuildPipeService.buildApplication(app, applicationLog); - return RestResponse.success(actionResult); + return applicationService.buildApplication(appId, forceBuild); } /** diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java index 04c4386ba..99b92b6ae 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java @@ -22,6 +22,7 @@ import org.apache.streampark.console.base.exception.InternalException; import org.apache.streampark.console.core.bean.ResponseResult; import org.apache.streampark.console.core.entity.FlinkCluster; import org.apache.streampark.console.core.service.FlinkClusterService; +import org.apache.streampark.console.core.service.ServiceHelper; import org.apache.shiro.authz.annotation.RequiresPermissions; @@ -42,6 +43,8 @@ public class FlinkClusterController { @Autowired private FlinkClusterService flinkClusterService; + @Autowired private ServiceHelper serviceHelper; + @PostMapping("list") public RestResponse list() { List<FlinkCluster> flinkClusters = flinkClusterService.listCluster(); @@ -63,7 +66,7 @@ public class FlinkClusterController { @PostMapping("create") @RequiresPermissions("cluster:create") public RestResponse create(FlinkCluster cluster) { - Boolean success = flinkClusterService.create(cluster); + Boolean success = flinkClusterService.create(cluster, serviceHelper.getUserId()); return RestResponse.success(success); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java index 88db35fa1..f43058d9e 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java @@ -456,7 +456,7 @@ public class Application implements Serializable { @SuppressWarnings("unchecked") public Map<String, Object> getOptionMap() { if (StringUtils.isBlank(this.options)) { - return Collections.emptyMap(); + return new HashMap<>(); } Map<String, Object> map = JacksonUtils.read(this.options, Map.class); map.entrySet().removeIf(entry -> entry.getValue() == null); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java new file mode 100644 index 000000000..844da1807 --- /dev/null +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.streampark.console.core.runner; + +import org.apache.streampark.common.enums.ClusterState; +import org.apache.streampark.common.enums.ExecutionMode; +import org.apache.streampark.common.util.PropertiesUtils; +import org.apache.streampark.console.core.entity.Application; +import org.apache.streampark.console.core.entity.FlinkCluster; +import org.apache.streampark.console.core.entity.FlinkEnv; +import org.apache.streampark.console.core.entity.FlinkSql; +import org.apache.streampark.console.core.service.ApplicationService; +import org.apache.streampark.console.core.service.FlinkClusterService; +import org.apache.streampark.console.core.service.FlinkEnvService; +import org.apache.streampark.console.core.service.FlinkSqlService; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.Map; + +@Order +@Slf4j +@Component +public class QuickStartRunner implements ApplicationRunner { + + @Autowired private FlinkEnvService flinkEnvService; + + @Autowired private ApplicationService applicationService; + + @Autowired private FlinkClusterService flinkClusterService; + + @Autowired private FlinkSqlService flinkSqlService; + + private static Long demoAppId = 100000L; + + @Override + public void run(ApplicationArguments args) throws Exception { + Map<String, HashMap<String, String>> map = + PropertiesUtils.extractMultipleArgumentsAsJava(args.getSourceArgs()); + Map<String, String> quickstart = map.get("quickstart"); + + if (!quickstart.isEmpty() && quickstart.size() == 3) { + // 1) create flinkEnv + FlinkEnv flinkEnv = new FlinkEnv(); + flinkEnv.setFlinkName(quickstart.get("flink_name")); + flinkEnv.setFlinkHome(quickstart.get("flink_home")); + flinkEnvService.create(flinkEnv); + + // 2) create flinkCluster + FlinkCluster flinkCluster = new FlinkCluster(); + flinkCluster.setClusterName("quickstart"); + flinkCluster.setVersionId(flinkEnv.getId()); + flinkCluster.setClusterState(ClusterState.STARTED.getValue()); + flinkCluster.setExecutionMode(ExecutionMode.REMOTE.getMode()); + flinkCluster.setAddress("http://localhost:" + quickstart.get("flink_port")); + flinkClusterService.create(flinkCluster, 100000L); + + // 3) set flink version and cluster + Application app = new Application(); + app.setId(demoAppId); + Application application = applicationService.getApp(app); + application.setFlinkClusterId(flinkCluster.getId()); + application.setVersionId(flinkEnv.getId()); + application.setExecutionMode(ExecutionMode.REMOTE.getMode()); + + FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), true); + application.setFlinkSql(flinkSql.getSql()); + + boolean success = applicationService.update(application); + if (success) { + // 4) build application + applicationService.buildApplication(demoAppId, false); + } + } + } +} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java index b9ce6d768..4d37ce836 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java @@ -19,6 +19,7 @@ package org.apache.streampark.console.core.service; import org.apache.streampark.common.enums.ExecutionMode; import org.apache.streampark.console.base.domain.RestRequest; +import org.apache.streampark.console.base.domain.RestResponse; import org.apache.streampark.console.base.exception.ApplicationException; import org.apache.streampark.console.core.entity.Application; import org.apache.streampark.console.core.enums.AppExistsState; @@ -67,7 +68,7 @@ public interface ApplicationService extends IService<Application> { String readConf(String config) throws IOException; - Application getApp(Application app); + Application getApp(Application application); String getMain(Application application); @@ -128,4 +129,6 @@ public interface ApplicationService extends IService<Application> { AppExistsState checkStart(Application app); List<ApplicationReport> getYARNApplication(String appName); + + RestResponse buildApplication(Long appId, boolean forceBuild) throws Exception; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java index d82387d04..a96cd4e16 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java @@ -30,7 +30,7 @@ public interface FlinkClusterService extends IService<FlinkCluster> { ResponseResult check(FlinkCluster flinkCluster); - Boolean create(FlinkCluster flinkCluster); + Boolean create(FlinkCluster flinkCluster, Long userId); void delete(Long id); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index dcdae2437..6e4f6e7ad 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -34,6 +34,7 @@ import org.apache.streampark.common.util.ThreadUtils; import org.apache.streampark.common.util.Utils; import org.apache.streampark.common.util.YarnUtils; import org.apache.streampark.console.base.domain.RestRequest; +import org.apache.streampark.console.base.domain.RestResponse; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.exception.ApiDetailException; import org.apache.streampark.console.base.exception.ApplicationException; @@ -1948,6 +1949,55 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli } } + @Override + public RestResponse buildApplication(Long appId, boolean forceBuild) throws Exception { + Application app = this.getById(appId); + + ApiAlertException.throwIfNull( + app.getVersionId(), "Please bind a Flink version to the current flink job."); + // 1) check flink version + FlinkEnv env = flinkEnvService.getById(app.getVersionId()); + boolean checkVersion = env.getFlinkVersion().checkVersion(false); + if (!checkVersion) { + throw new ApiAlertException("Unsupported flink version: " + env.getFlinkVersion().version()); + } + + // 2) check env + boolean envOk = this.checkEnv(app); + if (!envOk) { + throw new ApiAlertException( + "Check flink env failed, please check the flink version of this job"); + } + + if (!forceBuild && !appBuildPipeService.allowToBuildNow(appId)) { + throw new ApiAlertException( + "The job is invalid, or the job cannot be built while it is running"); + } + // check if you need to go through the build process (if the jar and pom have changed, + // you need to go through the build process, if other common parameters are modified, + // you don't need to go through the build process) + + ApplicationLog applicationLog = new ApplicationLog(); + applicationLog.setOptionName( + org.apache.streampark.console.core.enums.Operation.RELEASE.getValue()); + applicationLog.setAppId(app.getId()); + applicationLog.setOptionTime(new Date()); + + boolean needBuild = this.checkBuildAndUpdate(app); + if (!needBuild) { + applicationLog.setSuccess(true); + applicationLogService.save(applicationLog); + return RestResponse.success(true); + } + + // rollback + if (app.isNeedRollback() && app.isFlinkSqlJob()) { + flinkSqlService.rollback(app); + } + boolean actionResult = appBuildPipeService.buildApplication(app, applicationLog); + return RestResponse.success(actionResult); + } + private Tuple2<String, String> getNamespaceClusterId(Application application) { String clusterId = null; String k8sNamespace = null; diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java index efc2d295a..1cc9a82f4 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java @@ -139,8 +139,8 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli } @Override - public Boolean create(FlinkCluster flinkCluster) { - flinkCluster.setUserId(serviceHelper.getUserId()); + public Boolean create(FlinkCluster flinkCluster, Long userId) { + flinkCluster.setUserId(userId); boolean successful = validateQueueIfNeeded(flinkCluster); ApiAlertException.throwIfFalse( successful, String.format(ERROR_CLUSTER_QUEUE_HINT, flinkCluster.getYarnQueue())); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java index 4a19b709f..93c0c5490 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.system.runner; -import org.apache.streampark.common.conf.ConfigConst; +import org.apache.streampark.common.util.SystemPropertyUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -27,6 +27,8 @@ import org.springframework.context.ConfigurableApplicationContext; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; +import java.time.LocalDateTime; + @Order @Slf4j @Component @@ -37,7 +39,20 @@ public class StartedUpRunner implements ApplicationRunner { @Override public void run(ApplicationArguments args) { if (context.isActive()) { - ConfigConst.printLogo("streampark-console start successful"); + String port = SystemPropertyUtils.get("server.port", "10000"); + System.out.println("\n"); + System.out.println(" _____ __ __ "); + System.out.println(" / ___// /_________ ____ _____ ___ ____ ____ ______/ /__ "); + System.out.println(" \\__ \\/ __/ ___/ _ \\/ __ `/ __ `__ \\/ __ \\ __ `/ ___/ //_/"); + System.out.println(" ___/ / /_/ / / __/ /_/ / / / / / / /_/ / /_/ / / / ,< "); + System.out.println(" /____/\\__/_/ \\___/\\__,_/_/ /_/ /_/ ____/\\__,_/_/ /_/|_| "); + System.out.println(" /_/ \n\n"); + System.out.println(" Version: 2.1.5 "); + System.out.println(" WebSite: https://streampark.apache.org "); + System.out.println(" GitHub : https://github.com/apache/incubator-streampark "); + System.out.println(" Info : streampark-console start successful "); + System.out.println(" Local : http://localhost:" + port); + System.out.println(" Time : " + LocalDateTime.now() + "\n\n"); } } } diff --git a/streampark.sh b/streampark.sh new file mode 100755 index 000000000..0b7e7d312 --- /dev/null +++ b/streampark.sh @@ -0,0 +1,248 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# shellcheck disable=SC2317 + +# Bugzilla 37848: When no TTY is available, don't output to console +have_tty=0 +# shellcheck disable=SC2006 +if [[ "`tty`" != "not a tty" ]]; then + have_tty=1 +fi + +# Bugzilla 37848: When no TTY is available, don't output to console +have_tty=0 +# shellcheck disable=SC2006 +if [[ "`tty`" != "not a tty" ]]; then + have_tty=1 +fi + + # Only use colors if connected to a terminal +if [[ ${have_tty} -eq 1 ]]; then + RED=$(printf '\033[31m') + GREEN=$(printf '\033[32m') + BLUE=$(printf '\033[34m') + RESET=$(printf '\033[0m') +else + RED="" + GREEN="" + BLUE="" + RESET="" +fi + +echo_r () { + # Color red: Error, Failed + [[ $# -ne 1 ]] && return 1 + # shellcheck disable=SC2059 + printf "[%sStreamPark%s] %s$1%s\n" "$BLUE" "$RESET" "$RED" "$RESET" +} + +echo_g () { + # Color green: Success + [[ $# -ne 1 ]] && return 1 + # shellcheck disable=SC2059 + printf "[%sStreamPark%s] %s$1%s\n" "$BLUE" "$RESET" "$GREEN" "$RESET" +} + +# OS specific support. $var _must_ be set to either true or false. +cygwin=false; +darwin=false; +mingw=false +case "$(uname)" in + CYGWIN*) cygwin=true ;; + MINGW*) mingw=true;; + Darwin*) darwin=true + # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home + # See https://developer.apple.com/library/mac/qa/qa1170/_index.html + if [ -z "$JAVA_HOME" ]; then + if [ -x "/usr/libexec/java_home" ]; then + JAVA_HOME="$(/usr/libexec/java_home)"; export JAVA_HOME + else + JAVA_HOME="/Library/Java/Home"; export JAVA_HOME + fi + fi + ;; +esac + +if [ -z "$JAVA_HOME" ] ; then + if [ -r /etc/gentoo-release ] ; then + JAVA_HOME=$(java-config --jre-home) + fi +fi + +# For Cygwin, ensure paths are in UNIX format before anything is touched +if $cygwin ; then + [ -n "$JAVA_HOME" ] && + JAVA_HOME=$(cygpath --unix "$JAVA_HOME") + [ -n "$CLASSPATH" ] && + CLASSPATH=$(cygpath --path --unix "$CLASSPATH") +fi + +# For Mingw, ensure paths are in UNIX format before anything is touched +if $mingw ; then + [ -n "$JAVA_HOME" ] && [ -d "$JAVA_HOME" ] && + JAVA_HOME="$(cd "$JAVA_HOME" || (echo_r "cannot cd into $JAVA_HOME."; exit 1); pwd)" +fi + +if [ -z "$JAVA_HOME" ]; then + javaExecutable="$(which javac)" + if [ -n "$javaExecutable" ] && ! [ "$(expr "\"$javaExecutable\"" : '\([^ ]*\)')" = "no" ]; then + # readlink(1) is not available as standard on Solaris 10. + readLink=$(which readlink) + if [ ! "$(expr "$readLink" : '\([^ ]*\)')" = "no" ]; then + if $darwin ; then + javaHome="$(dirname "\"$javaExecutable\"")" + javaExecutable="$(cd "\"$javaHome\"" && pwd -P)/javac" + else + javaExecutable="$(readlink -f "\"$javaExecutable\"")" + fi + javaHome="$(dirname "\"$javaExecutable\"")" + javaHome=$(expr "$javaHome" : '\(.*\)/bin') + JAVA_HOME="$javaHome" + export JAVA_HOME + fi + fi +fi + +if [ -z "$JAVACMD" ] ; then + if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + else + JAVACMD="$(\unset -f command 2>/dev/null; \command -v java)" + fi +fi + +if [ ! -x "$JAVACMD" ] ; then + echo_r "Error: JAVA_HOME is not defined correctly." >&2 + echo_r " We cannot execute $JAVACMD" >&2 + exit 1 +fi + +if [ -z "$JAVA_HOME" ] ; then + echo_r "Warning: JAVA_HOME environment variable is not set." +fi + +_RUNJAVA="$JAVA_HOME/bin/java" + +# resolve links - $0 may be a softlink +PRG="$0" + +while [[ -h "$PRG" ]]; do + # shellcheck disable=SC2006 + ls=`ls -ld "$PRG"` + # shellcheck disable=SC2006 + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + # shellcheck disable=SC2006 + PRG=`dirname "$PRG"`/"$link" + fi +done + +# Get standard environment variables +# shellcheck disable=SC2006 +PRG_DIR=`dirname "$PRG"` +WORK_DIR=$(cd "$PRG_DIR" >/dev/null || exit; pwd) + +SP_VERSION="2.1.5" +SP_NAME="apache-streampark_2.12-${SP_VERSION}-incubating-bin" +SP_TAR="${SP_NAME}.tar.gz" +SP_URL="https://archive.apache.org/dist/incubator/streampark/${SP_VERSION}/${SP_TAR}" +SP_HOME="${WORK_DIR}"/"${SP_NAME}" +SP_CONFIG="${SP_HOME}/conf/config.yaml" + +download() { + local url=$1 + local name=$2 + if command -v wget > /dev/null; then + wget "$url" || rm -f "$name" + # shellcheck disable=SC2181 + if [[ $? -ne 0 ]]; then + echo_r "download $name failed. url: $url" + exit 1 + fi + elif command -v curl > /dev/null; then + curl "$url" -f -L || rm -f "$name" + # shellcheck disable=SC2181 + if [[ $? -ne 0 ]]; then + echo_r "download $name failed. url: $url" + exit 1 + fi + fi +} + +BASH_UTIL="org.apache.streampark.console.base.util.BashJavaUtils" + +# 1). download streampark. +echo_g "download streampark..." + +# download "$SP_URL" "$SP_TAR" +tar -xvf "${SP_TAR}" >/dev/null 2>&1 \ + && rm -r "${SP_TAR}" \ + && mkdir "${SP_HOME}"/flink \ + && mkdir "${SP_HOME}"/workspace + +# 1.1) workspace +$_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --replace "$SP_CONFIG" "local: ||local: ${SP_HOME}/workspace #" + +# 1.2) port. +SP_PORT=$($_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --free_port "10000") +$_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --replace "$SP_CONFIG" "port: 10000||port: ${SP_PORT}" + +# 2). flink +# shellcheck disable=SC2009 +FLINK_PROCESS="$(ps -ef | grep "flink-dist-" | grep 'org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint')" +if [[ -n "${FLINK_PROCESS}" ]]; then + FLINK_PARAM=$($_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --read_flink "$FLINK_PROCESS") + IFS=',' read -r -a ARRAY <<< "$FLINK_PARAM" + FLINK_HOME=${ARRAY[0]} + FLINK_NAME=${ARRAY[1]} + FLINK_PORT=${ARRAY[2]} +else + FLINK_NAME="flink-1.19.0" + FLINK_URL="https://archive.apache.org/dist/flink/${FLINK_NAME}/${FLINK_NAME}-bin-scala_2.12.tgz" + FLINK_TAR="${FLINK_NAME}-bin-scala_2.12.tgz" + FLINK_HOME="${WORK_DIR}"/${SP_NAME}/flink/${FLINK_NAME} + FLINK_CONF="${FLINK_HOME}/conf/config.yaml" + + # 1) download flink + echo_g "download flink..." + download "$FLINK_URL" "$FLINK_TAR" + tar -xvf "${FLINK_TAR}" >/dev/null 2>&1 \ + && rm -r "${FLINK_TAR}" \ + && mv "$FLINK_NAME" "${WORK_DIR}"/"${SP_NAME}"/flink + + # 2) start flink-cluster + FLINK_PORT=$($_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --free_port "8081") + $_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --replace "$SP_CONFIG" "# port: 8081||port: ${FLINK_PORT}" + + bash +x "${FLINK_HOME}"/bin/start-cluster.sh +fi + +# 3) start streampark +bash +x "${SP_HOME}"/bin/startup.sh \ + --quickstart flink_home="$FLINK_HOME" \ + --quickstart flink_port="$FLINK_PORT" \ + --quickstart flink_name="quickstart-$FLINK_NAME"
