This is an automated email from the ASF dual-hosted git repository.
cancai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 1326b9ff1 [Feat] sync quick-install support from dev-2.1.4 (#3763)
1326b9ff1 is described below
commit 1326b9ff1bab693f8d2d5f58d38ce9e5a779a49c
Author: benjobs <[email protected]>
AuthorDate: Sun Jun 16 20:10:12 2024 +0800
[Feat] sync quick-install support from dev-2.1.4 (#3763)
Co-authored-by: benjobs <[email protected]>
---
.../apache/streampark/common/util/FileUtils.scala | 18 ++
.../streampark/common/util/PropertiesUtils.scala | 93 +++++--
.../src/main/assembly/bin/streampark.sh | 51 ++--
.../console/base/util/BashJavaUtils.java | 90 ++++++-
.../core/controller/FlinkClusterController.java | 6 +-
.../console/core/entity/AppBuildPipeline.java | 9 +-
.../console/core/entity/Application.java | 2 +-
.../console/core/entity/ApplicationConfig.java | 6 +-
.../console/core/entity/FlinkCluster.java | 5 +-
.../console/core/entity/SparkApplication.java | 2 +-
.../console/core/runner/QuickStartRunner.java | 101 ++++++++
.../console/core/service/FlinkClusterService.java | 2 +-
.../core/service/impl/AppBuildPipeServiceImpl.java | 6 +-
.../core/service/impl/FlinkClusterServiceImpl.java | 7 +-
.../service/impl/SparkAppBuildPipeServiceImpl.java | 6 +-
.../core/utils/YarnQueueLabelExpression.java | 3 +-
.../console/system/runner/StartedUpRunner.java | 19 +-
streampark.sh | 282 +++++++++++++++++++++
18 files changed, 623 insertions(+), 85 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
index e8fe7ef51..7d508b2c1 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
@@ -25,6 +25,7 @@ import java.nio.channels.Channels
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import java.util
+import java.util.Scanner
import java.util.stream.Collectors
import scala.collection.convert.ImplicitConversions._
@@ -282,4 +283,21 @@ object FileUtils {
null
}
+ @throws[IOException]
+ def readString(file: File): String = {
+ require(file != null && file.isFile)
+ val reader = new FileReader(file)
+ val scanner = new Scanner(reader)
+ val buffer = new mutable.StringBuilder()
+ if (scanner.hasNextLine) {
+ buffer.append(scanner.nextLine())
+ }
+ while (scanner.hasNextLine) {
+ buffer.append("\r\n")
+ buffer.append(scanner.nextLine())
+ }
+ Utils.close(scanner, reader)
+ buffer.toString()
+ }
+
}
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index e3a0ae547..d9a00a6d9 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -70,13 +70,12 @@ object PropertiesUtils extends Logger {
})
.toMap
case text =>
- val value = text match {
- case null => ""
- case other => other.toString
- }
- prefix match {
- case "" => proper += k -> value
- case other => proper += s"$other.$k" -> value
+ if (text != null) {
+ val value = text.toString.trim
+ prefix match {
+ case "" => proper += k -> value
+ case other => proper += s"$other.$k" -> value
+ }
}
proper.toMap
}
@@ -276,7 +275,7 @@ object PropertiesUtils extends Logger {
/** extract flink configuration from application.properties */
@Nonnull def extractDynamicProperties(properties: String): Map[String,
String] = {
- if (StringUtils.isBlank(properties)) Map.empty[String, String]
+ if (StringUtils.isEmpty(properties)) Map.empty[String, String]
else {
val map = mutable.Map[String, String]()
val simple = properties.replaceAll(MULTI_PROPERTY_REGEXP, "")
@@ -308,28 +307,76 @@ object PropertiesUtils extends Logger {
@Nonnull def extractArguments(args: String): List[String] = {
val programArgs = new ArrayBuffer[String]()
if (StringUtils.isNotEmpty(args)) {
- val array = args.split("\\s+")
- val iter = array.iterator
- while (iter.hasNext) {
- val v = iter.next()
- val p = v.take(1)
- p match {
- case "'" | "\"" =>
- var value = v
- if (!v.endsWith(p)) {
- while (!value.endsWith(p) && iter.hasNext) {
- value += s" ${iter.next()}"
- }
+ return extractArguments(args.split("\\s+"))
+ }
+ programArgs.toList
+ }
+
+ def extractArguments(array: Array[String]): List[String] = {
+ val programArgs = new ArrayBuffer[String]()
+ val iter = array.iterator
+ while (iter.hasNext) {
+ val v = iter.next()
+ val p = v.take(1)
+ p match {
+ case "'" | "\"" =>
+ var value = v
+ if (!v.endsWith(p)) {
+ while (!value.endsWith(p) && iter.hasNext) {
+ value += s" ${iter.next()}"
}
- programArgs += value.substring(1, value.length - 1)
- case _ => programArgs += v
- }
+ }
+ programArgs += value.substring(1, value.length - 1)
+ case _ =>
+ val regexp = "(.*)='(.*)'$"
+ if (v.matches(regexp)) {
+ programArgs += v.replaceAll(regexp, "$1=$2")
+ } else {
+ val regexp = "(.*)=\"(.*)\"$"
+ if (v.matches(regexp)) {
+ programArgs += v.replaceAll(regexp, "$1=$2")
+ } else {
+ programArgs += v
+ }
+ }
}
}
programArgs.toList
}
+ def extractMultipleArguments(array: Array[String]): Map[String, Map[String,
String]] = {
+ val iter = array.iterator
+ val map = mutable.Map[String, mutable.Map[String, String]]()
+ while (iter.hasNext) {
+ val v = iter.next()
+ v.take(2) match {
+ case "--" =>
+ val kv = iter.next()
+ val regexp = "(.*)=(.*)"
+ if (kv.matches(regexp)) {
+ val values = kv.split("=")
+ val k1 = values(0).trim
+ val v1 = values(1).replaceAll("^['|\"]|['|\"]$", "")
+ val k = v.drop(2)
+ map.get(k) match {
+ case Some(m) => m += k1 -> v1
+ case _ => map += k -> mutable.Map(k1 -> v1)
+ }
+ }
+ case _ =>
+ }
+ }
+ map.map(x => x._1 -> x._2.toMap).toMap
+ }
+
@Nonnull def extractDynamicPropertiesAsJava(properties: String):
JavaMap[String, String] =
new JavaMap[String, String](extractDynamicProperties(properties).asJava)
+ @Nonnull def extractMultipleArgumentsAsJava(
+ args: Array[String]): JavaMap[String, JavaMap[String, String]] = {
+ val map =
+ extractMultipleArguments(args).map(c => c._1 -> new JavaMap[String,
String](c._2.asJava))
+ new JavaMap[String, JavaMap[String, String]](map.asJava)
+ }
+
}
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
index 7b3e59f87..4df9e5e67 100755
---
a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
+++
b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
@@ -132,7 +132,7 @@ APP_BASE="$APP_HOME"
APP_CONF="$APP_BASE"/conf
APP_LIB="$APP_BASE"/lib
APP_LOG="$APP_BASE"/logs
-APP_PID="$APP_BASE"/streampark.pid
+APP_PID="$APP_BASE"/.pid
APP_OUT="$APP_LOG"/streampark.out
# shellcheck disable=SC2034
APP_TMPDIR="$APP_BASE"/temp
@@ -241,10 +241,16 @@ if [[ "$USE_NOHUP" = "true" ]]; then
NOHUP="nohup"
fi
-BASH_UTIL="org.apache.streampark.console.base.util.BashJavaUtils"
+CONFIG="${APP_CONF}/config.yaml"
+# shellcheck disable=SC2006
+if [[ ! -f "$CONFIG" ]] ; then
+ echo_r "can not found config.yaml in \"conf\" directory, please check."
+ exit 1;
+fi
+BASH_UTIL="org.apache.streampark.console.base.util.BashJavaUtils"
APP_MAIN="org.apache.streampark.console.StreamParkConsoleBootstrap"
-
+SERVER_PORT=$($_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --get_yaml "server.port"
"$CONFIG")
JVM_OPTS_FILE=${APP_HOME}/bin/jvm_opts.sh
JVM_ARGS=""
@@ -276,21 +282,8 @@ print_logo() {
printf ' %s WebSite: https://streampark.apache.org%s\n'
$BLUE $RESET
printf ' %s GitHub : http://github.com/apache/streampark%s\n\n'
$BLUE $RESET
printf ' %s ──────── Apache StreamPark, Make stream processing easier
ô~ô!%s\n\n' $PRIMARY $RESET
-}
-
-init_env() {
- # shellcheck disable=SC2006
- CONFIG="${APP_CONF}/application.yml"
- if [[ -f "$CONFIG" ]] ; then
- echo_y """[WARN] in the \"conf\" directory, found the \"application.yml\"
file. The \"application.yml\" file is deprecated.
- For compatibility, this application.yml will be used preferentially.
The latest configuration file is \"config.yaml\". It is recommended to use
\"config.yaml\".
- Note: \"application.yml\" will be completely deprecated in version
2.2.0. """
- else
- CONFIG="${APP_CONF}/config.yaml"
- if [[ ! -f "$CONFIG" ]] ; then
- echo_r "can not found config.yaml in \"conf\" directory, please check."
- exit 1;
- fi
+ if [[ "$1"x == "start"x ]]; then
+ printf ' %s http://localhost:%s %s\n\n'
$PRIMARY $SERVER_PORT $RESET
fi
}
@@ -313,19 +306,19 @@ get_pid() {
fi
# shellcheck disable=SC2006
- local serverPort=`$_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --get_yaml
"server.port" "$CONFIG"`
- if [[ x"${serverPort}" == x"" ]]; then
+ if [[ "${SERVER_PORT}"x == ""x ]]; then
echo_r "server.port is required, please check $CONFIG"
exit 1;
else
# shellcheck disable=SC2006
# shellcheck disable=SC2155
- local used=`$_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --check_port
"$serverPort"`
- if [[ x"${used}" == x"used" ]]; then
+ local used=`$_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --check_port
"$SERVER_PORT"`
+ if [[ "${used}"x == "used"x ]]; then
# shellcheck disable=SC2006
local PID=`jps -l | grep "$APP_MAIN" | awk '{print $1}'`
+ # shellcheck disable=SC2236
if [[ ! -z $PID ]]; then
- echo $PID
+ echo "$PID"
else
echo 0
fi
@@ -411,7 +404,7 @@ start() {
-Dapp.home="${APP_HOME}" \
-Dlogging.config="${APP_CONF}/logback-spring.xml" \
-Djava.io.tmpdir="$APP_TMPDIR" \
- $APP_MAIN >> "$APP_OUT" 2>&1 "&"
+ $APP_MAIN "$@" >> "$APP_OUT" 2>&1 "&"
local PID=$!
local IS_NUMBER="^[0-9]+$"
@@ -565,27 +558,31 @@ restart() {
}
main() {
- print_logo
- init_env
case "$1" in
"debug")
DEBUG_PORT=$2
debug
;;
"start")
- start
+ shift
+ start "$@"
+ [[ $? -eq 0 ]] && print_logo "start"
;;
"start_docker")
+ print_logo
start_docker
;;
"stop")
+ print_logo
stop
;;
"status")
+ print_logo
status
;;
"restart")
restart
+ [[ $? -eq 0 ]] && print_logo "start"
;;
*)
echo_r "Unknown command: $1"
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java
index faf8b8ead..999167fde 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java
@@ -17,15 +17,30 @@
package org.apache.streampark.console.base.util;
+import org.apache.streampark.common.conf.FlinkVersion;
+import org.apache.streampark.common.util.FileUtils;
import org.apache.streampark.common.util.PropertiesUtils;
+import org.apache.commons.io.output.NullOutputStream;
+
+import java.io.File;
+import java.io.FileWriter;
import java.io.IOException;
-import java.net.ServerSocket;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.net.Socket;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
import java.util.Arrays;
import java.util.Map;
public class BashJavaUtils {
+ private static String localhost = "localhost";
+
public static void main(String[] args) throws IOException {
String action = args[0].toLowerCase();
String[] actionArgs = Arrays.copyOfRange(args, 1, args.length);
@@ -39,12 +54,77 @@ public class BashJavaUtils {
System.out.println(value);
break;
case "--check_port":
- Integer port = Integer.parseInt(actionArgs[0]);
- try {
- new ServerSocket(port);
+ int port = Integer.parseInt(actionArgs[0]);
+ try (Socket ignored = new Socket(localhost, port)) {
+ System.out.println("used");
+ } catch (Exception e) {
System.out.println("free");
+ }
+ break;
+ case "--free_port":
+ int start = Integer.parseInt(actionArgs[0]);
+ for (port = start; port < 65535; port++) {
+ try (Socket ignored = new Socket(localhost, port)) {
+ } catch (Exception e) {
+ System.out.println(port);
+ break;
+ }
+ }
+ break;
+ case "--read_flink":
+ String input = actionArgs[0];
+ String[] inputs = input.split(":");
+ String flinkDist =
+ Arrays.stream(inputs).filter(c ->
c.contains("flink-dist-")).findFirst().get();
+ File flinkHome = new File(flinkDist.replaceAll("/lib/.*", ""));
+ FlinkVersion flinkVersion = new
FlinkVersion(flinkHome.getAbsolutePath());
+
+ PrintStream originalOut = System.out;
+ System.setOut(new PrintStream(new NullOutputStream()));
+
+ String version = flinkVersion.majorVersion();
+ float ver = Float.parseFloat(version);
+ File yaml =
+ new File(flinkHome, ver < 1.19f ? "/conf/flink-conf.yaml" :
"/conf/config.yaml");
+
+ Map<String, String> config =
PropertiesUtils.fromYamlFileAsJava(yaml.getAbsolutePath());
+ String flinkPort = config.getOrDefault("rest.port", "8081");
+ System.setOut(originalOut);
+ System.out.println(
+ flinkHome
+ .getAbsolutePath()
+ .concat(",")
+ .concat(flinkHome.getName())
+ .concat(",")
+ .concat(flinkPort));
+ break;
+ case "--replace":
+ String filePath = actionArgs[0];
+ String[] text = actionArgs[1].split("\\|\\|");
+ String searchText = text[0];
+ String replaceText = text[1];
+ try {
+ File file = new File(filePath);
+ String content = FileUtils.readString(file);
+ content = content.replace(searchText, replaceText);
+ FileWriter writer = new FileWriter(filePath);
+ writer.write(content);
+ writer.flush();
+ writer.close();
+ System.exit(0);
+ } catch (IOException e) {
+ System.exit(1);
+ }
+ break;
+ case "--download":
+ try {
+ URL url = new URL(actionArgs[0]);
+ Path path = Paths.get(actionArgs[1]).toAbsolutePath().normalize();
+ try (InputStream inStream = url.openStream()) {
+ Files.copy(inStream, path, StandardCopyOption.REPLACE_EXISTING);
+ }
} catch (Exception e) {
- System.out.println("used");
+ System.exit(1);
}
break;
default:
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
index 71b297d67..dd796437d 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
@@ -23,6 +23,7 @@ import
org.apache.streampark.console.base.exception.InternalException;
import org.apache.streampark.console.core.bean.ResponseResult;
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.service.FlinkClusterService;
+import org.apache.streampark.console.core.service.ServiceHelper;
import org.apache.shiro.authz.annotation.RequiresPermissions;
@@ -43,6 +44,8 @@ public class FlinkClusterController {
@Autowired private FlinkClusterService flinkClusterService;
+ @Autowired private ServiceHelper serviceHelper;
+
@PostMapping("availableList")
public RestResponse listAvailableCluster() {
List<FlinkCluster> flinkClusters =
flinkClusterService.listAvailableCluster();
@@ -70,7 +73,8 @@ public class FlinkClusterController {
@PostMapping("create")
@RequiresPermissions("cluster:create")
public RestResponse create(FlinkCluster cluster) {
- Boolean success = flinkClusterService.create(cluster);
+ Long userId = serviceHelper.getUserId();
+ Boolean success = flinkClusterService.create(cluster, userId);
return RestResponse.success(success);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/AppBuildPipeline.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/AppBuildPipeline.java
index 75b5e386c..9314a0714 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/AppBuildPipeline.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/AppBuildPipeline.java
@@ -46,7 +46,6 @@ import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -122,7 +121,7 @@ public class AppBuildPipeline {
@JsonIgnore
public Map<Integer, PipelineStepStatusEnum> getStepStatus() {
if (StringUtils.isBlank(stepStatusJson)) {
- return Collections.emptyMap();
+ return new HashMap<>();
}
try {
return JacksonUtils.read(
@@ -130,7 +129,7 @@ public class AppBuildPipeline {
} catch (JsonProcessingException e) {
log.error(
"json parse error on ApplicationBuildPipeline, stepStatusJson={}",
stepStatusJson, e);
- return Collections.emptyMap();
+ return new HashMap<>();
}
}
@@ -153,7 +152,7 @@ public class AppBuildPipeline {
@JsonIgnore
public Map<Integer, Long> getStepStatusTimestamp() {
if (StringUtils.isBlank(stepStatusTimestampJson)) {
- return Collections.emptyMap();
+ return new HashMap<>();
}
try {
return JacksonUtils.read(
@@ -163,7 +162,7 @@ public class AppBuildPipeline {
"json parse error on ApplicationBuildPipeline, stepStatusJson={}",
stepStatusTimestampJson,
e);
- return Collections.emptyMap();
+ return new HashMap<>();
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 93dacc945..bd6498757 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -438,7 +438,7 @@ public class Application implements Serializable {
@SuppressWarnings("unchecked")
public Map<String, Object> getOptionMap() {
if (StringUtils.isBlank(this.options)) {
- return Collections.emptyMap();
+ return new HashMap<>();
}
Map<String, Object> optionMap = JacksonUtils.read(this.options, Map.class);
optionMap.entrySet().removeIf(entry -> entry.getValue() == null);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
index 5a096fbde..93634c3d0 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
@@ -34,8 +34,8 @@ import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.Nullable;
import java.util.Base64;
-import java.util.Collections;
import java.util.Date;
+import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
@@ -109,7 +109,7 @@ public class ApplicationConfig {
},
Map.Entry::getValue));
}
- return Collections.emptyMap();
+ return new HashMap<>();
}
@Nullable
@@ -126,7 +126,7 @@ public class ApplicationConfig {
case HOCON:
return
PropertiesUtils.fromHoconTextAsJava(DeflaterUtils.unzipString(this.content));
default:
- return Collections.emptyMap();
+ return new HashMap<>();
}
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index d1ce09e77..0314323b1 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -46,7 +46,6 @@ import lombok.extern.slf4j.Slf4j;
import java.io.Serializable;
import java.net.URI;
-import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -136,7 +135,7 @@ public class FlinkCluster implements Serializable {
@SneakyThrows
public Map<String, Object> getOptionMap() {
if (StringUtils.isBlank(this.options)) {
- return Collections.emptyMap();
+ return new HashMap<>();
}
Map<String, Object> optionMap = JacksonUtils.read(this.options, Map.class);
if (FlinkExecutionMode.YARN_SESSION == getFlinkExecutionModeEnum()) {
@@ -167,7 +166,7 @@ public class FlinkCluster implements Serializable {
HttpClientUtils.httpGetRequest(
restUrl, RequestConfig.custom().setConnectTimeout(2000,
TimeUnit.MILLISECONDS).build());
if (StringUtils.isBlank(json)) {
- return Collections.emptyMap();
+ return new HashMap<>();
}
List<Map<String, String>> confList =
JacksonUtils.read(json, new TypeReference<List<Map<String, String>>>()
{});
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
index f8d3d6fad..eb8b6688d 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
@@ -422,7 +422,7 @@ public class SparkApplication implements Serializable {
@SuppressWarnings("unchecked")
public Map<String, Object> getOptionMap() {
if (StringUtils.isBlank(this.options)) {
- return Collections.emptyMap();
+ return new HashMap<>();
}
Map<String, Object> optionMap = JacksonUtils.read(this.options, Map.class);
optionMap.entrySet().removeIf(entry -> entry.getValue() == null);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java
new file mode 100644
index 000000000..98a215f74
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.runner;
+
+import org.apache.streampark.common.enums.ClusterState;
+import org.apache.streampark.common.enums.FlinkExecutionMode;
+import org.apache.streampark.common.util.PropertiesUtils;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.console.core.entity.FlinkEnv;
+import org.apache.streampark.console.core.entity.FlinkSql;
+import org.apache.streampark.console.core.service.AppBuildPipeService;
+import org.apache.streampark.console.core.service.FlinkClusterService;
+import org.apache.streampark.console.core.service.FlinkEnvService;
+import org.apache.streampark.console.core.service.FlinkSqlService;
+import
org.apache.streampark.console.core.service.application.ApplicationManageService;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Order
+@Slf4j
+@Component
+public class QuickStartRunner implements ApplicationRunner {
+
+ @Autowired private FlinkEnvService flinkEnvService;
+
+ @Autowired private FlinkClusterService flinkClusterService;
+
+ @Autowired private FlinkSqlService flinkSqlService;
+
+ @Autowired private ApplicationManageService applicationManageService;
+
+ @Autowired private AppBuildPipeService appBuildPipeService;
+
+ private static Long defaultId = 100000L;
+
+ @Override
+ public void run(ApplicationArguments args) throws Exception {
+ Map<String, HashMap<String, String>> map =
+ PropertiesUtils.extractMultipleArgumentsAsJava(args.getSourceArgs());
+
+ Map<String, String> quickstart = map.get("quickstart");
+
+ if (quickstart != null && quickstart.size() == 3) {
+ // 1) create flinkEnv
+ FlinkEnv flinkEnv = new FlinkEnv();
+ flinkEnv.setFlinkName(quickstart.get("flink_name"));
+ flinkEnv.setFlinkHome(quickstart.get("flink_home"));
+ flinkEnvService.create(flinkEnv);
+
+ // 2) create flinkCluster
+ FlinkCluster flinkCluster = new FlinkCluster();
+ flinkCluster.setClusterName("quickstart");
+ flinkCluster.setVersionId(flinkEnv.getId());
+ flinkCluster.setClusterState(ClusterState.RUNNING.getState());
+ flinkCluster.setExecutionMode(FlinkExecutionMode.REMOTE.getMode());
+ flinkCluster.setAddress("http://localhost:" +
quickstart.get("flink_port"));
+ flinkClusterService.create(flinkCluster, defaultId);
+
+ // 3) set flink version and cluster
+ Application app = new Application();
+ app.setId(defaultId);
+ Application application = applicationManageService.getApp(app.getId());
+ application.setFlinkClusterId(flinkCluster.getId());
+ application.setVersionId(flinkEnv.getId());
+ application.setExecutionMode(FlinkExecutionMode.REMOTE.getMode());
+
+ FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(),
true);
+ application.setFlinkSql(flinkSql.getSql());
+
+ boolean success = applicationManageService.update(application);
+ if (success) {
+ // 4) build application
+ appBuildPipeService.buildApplication(defaultId, false);
+ }
+ }
+ }
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
index 30f0ea881..ba24b5809 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
@@ -51,7 +51,7 @@ public interface FlinkClusterService extends
IService<FlinkCluster> {
* @param flinkCluster FlinkCluster to be create
* @return Whether the creation is successful
*/
- Boolean create(FlinkCluster flinkCluster);
+ Boolean create(FlinkCluster flinkCluster, Long userId);
/**
* Remove flink cluster
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index cf4d9adf0..b85ce4f45 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -105,8 +105,8 @@ import javax.annotation.Nonnull;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Date;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -615,14 +615,14 @@ public class AppBuildPipeServiceImpl
@Override
public Map<Long, PipelineStatusEnum> listAppIdPipelineStatusMap(List<Long>
appIds) {
if (CollectionUtils.isEmpty(appIds)) {
- return Collections.emptyMap();
+ return new HashMap<>();
}
LambdaQueryWrapper<AppBuildPipeline> queryWrapper =
new
LambdaQueryWrapper<AppBuildPipeline>().in(AppBuildPipeline::getAppId, appIds);
List<AppBuildPipeline> appBuildPipelines =
baseMapper.selectList(queryWrapper);
if (CollectionUtils.isEmpty(appBuildPipelines)) {
- return Collections.emptyMap();
+ return new HashMap<>();
}
return appBuildPipelines.stream()
.collect(Collectors.toMap(AppBuildPipeline::getAppId,
AppBuildPipeline::getPipelineStatus));
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 73ee6370b..e90845312 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -28,7 +28,6 @@ import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.mapper.FlinkClusterMapper;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkEnvService;
-import org.apache.streampark.console.core.service.ServiceHelper;
import org.apache.streampark.console.core.service.YarnQueueService;
import
org.apache.streampark.console.core.service.application.ApplicationInfoService;
import org.apache.streampark.console.core.watcher.FlinkClusterWatcher;
@@ -82,8 +81,6 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
@Autowired private FlinkEnvService flinkEnvService;
- @Autowired private ServiceHelper serviceHelper;
-
@Autowired private ApplicationInfoService applicationInfoService;
@Autowired private YarnQueueService yarnQueueService;
@@ -141,8 +138,8 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
}
@Override
- public Boolean create(FlinkCluster flinkCluster) {
- flinkCluster.setUserId(serviceHelper.getUserId());
+ public Boolean create(FlinkCluster flinkCluster, Long userId) {
+ flinkCluster.setUserId(userId);
return internalCreate(flinkCluster);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java
index ebaaeadd0..620e4e4a5 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java
@@ -86,8 +86,8 @@ import javax.annotation.Nonnull;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Date;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -437,14 +437,14 @@ public class SparkAppBuildPipeServiceImpl
@Override
public Map<Long, PipelineStatusEnum> listAppIdPipelineStatusMap(List<Long>
appIds) {
if (CollectionUtils.isEmpty(appIds)) {
- return Collections.emptyMap();
+ return new HashMap<>();
}
LambdaQueryWrapper<AppBuildPipeline> queryWrapper =
new
LambdaQueryWrapper<AppBuildPipeline>().in(AppBuildPipeline::getAppId, appIds);
List<AppBuildPipeline> appBuildPipelines =
baseMapper.selectList(queryWrapper);
if (CollectionUtils.isEmpty(appBuildPipelines)) {
- return Collections.emptyMap();
+ return new HashMap<>();
}
return appBuildPipelines.stream()
.collect(Collectors.toMap(AppBuildPipeline::getAppId,
AppBuildPipeline::getPipelineStatus));
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/YarnQueueLabelExpression.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/YarnQueueLabelExpression.java
index 9604084f8..f7f54514d 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/YarnQueueLabelExpression.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/YarnQueueLabelExpression.java
@@ -27,7 +27,6 @@ import com.google.common.annotations.VisibleForTesting;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -105,7 +104,7 @@ public class YarnQueueLabelExpression {
public static Map<String, String> getQueueLabelMap(String queueLabelExp) {
if (StringUtils.isBlank(queueLabelExp)) {
- return Collections.emptyMap();
+ return new HashMap<>();
}
YarnQueueLabelExpression yarnQueueLabelExpression = of(queueLabelExp);
Map<String, String> queueLabelMap = new HashMap<>(2);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java
index 3228c2266..9a5f05042 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java
@@ -17,7 +17,7 @@
package org.apache.streampark.console.system.runner;
-import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.common.util.SystemPropertyUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -27,6 +27,8 @@ import
org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
+import java.time.LocalDateTime;
+
@Order
@Slf4j
@Component
@@ -37,7 +39,20 @@ public class StartedUpRunner implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) {
if (context.isActive()) {
- Utils.printLogo("streampark-console start successful");
+ String port = SystemPropertyUtils.get("server.port", "10000");
+ System.out.println("\n");
+ System.out.println(" _____ __
__ ");
+ System.out.println(" / ___// /_________ ____ _____ ___ ____
____ ______/ /__ ");
+ System.out.println(" \\__ \\/ __/ ___/ _ \\/ __ `/ __ `__ \\/ __
\\ __ `/ ___/ //_/");
+ System.out.println(" ___/ / /_/ / / __/ /_/ / / / / / / /_/ / /_/
/ / / ,< ");
+ System.out.println(" /____/\\__/_/ \\___/\\__,_/_/ /_/ /_/
____/\\__,_/_/ /_/|_| ");
+ System.out.println(" /_/
\n\n");
+ System.out.println(" Version: 2.2.0
");
+ System.out.println(" WebSite: https://streampark.apache.org
");
+ System.out.println(" GitHub :
https://github.com/apache/incubator-streampark ");
+ System.out.println(" Info : streampark-console start successful
");
+ System.out.println(" Local : http://localhost:" + port);
+ System.out.println(" Time : " + LocalDateTime.now() + "\n\n");
}
}
}
diff --git a/streampark.sh b/streampark.sh
new file mode 100755
index 000000000..c8ae6379d
--- /dev/null
+++ b/streampark.sh
@@ -0,0 +1,282 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# shellcheck disable=SC2317
+
+# Bugzilla 37848: When no TTY is available, don't output to console
+have_tty=0
+# shellcheck disable=SC2006
+if [[ "`tty`" != "not a tty" ]]; then
+ have_tty=1
+fi
+
+# Bugzilla 37848: When no TTY is available, don't output to console
+have_tty=0
+# shellcheck disable=SC2006
+if [[ "`tty`" != "not a tty" ]]; then
+ have_tty=1
+fi
+
+ # Only use colors if connected to a terminal
+if [[ ${have_tty} -eq 1 ]]; then
+ RED=$(printf '\033[31m')
+ GREEN=$(printf '\033[32m')
+ BLUE=$(printf '\033[34m')
+ RESET=$(printf '\033[0m')
+else
+ RED=""
+ GREEN=""
+ BLUE=""
+ RESET=""
+fi
+
+echo_r () {
+ # Color red: Error, Failed
+ [[ $# -ne 1 ]] && return 1
+ # shellcheck disable=SC2059
+ printf "[%sStreamPark%s] %s$1%s\n" "$BLUE" "$RESET" "$RED" "$RESET"
+}
+
+echo_g () {
+ # Color green: Success
+ [[ $# -ne 1 ]] && return 1
+ # shellcheck disable=SC2059
+ printf "[%sStreamPark%s] %s$1%s\n" "$BLUE" "$RESET" "$GREEN" "$RESET"
+}
+
+# OS specific support. $var _must_ be set to either true or false.
+cygwin=false;
+darwin=false;
+mingw=false
+case "$(uname)" in
+ CYGWIN*) cygwin=true ;;
+ MINGW*) mingw=true;;
+ Darwin*) darwin=true
+ # Use /usr/libexec/java_home if available, otherwise fall back to
/Library/Java/Home
+ # See https://developer.apple.com/library/mac/qa/qa1170/_index.html
+ if [ -z "$JAVA_HOME" ]; then
+ if [ -x "/usr/libexec/java_home" ]; then
+ JAVA_HOME="$(/usr/libexec/java_home)"; export JAVA_HOME
+ else
+ JAVA_HOME="/Library/Java/Home"; export JAVA_HOME
+ fi
+ fi
+ ;;
+esac
+
+if [ -z "$JAVA_HOME" ] ; then
+ if [ -r /etc/gentoo-release ] ; then
+ JAVA_HOME=$(java-config --jre-home)
+ fi
+fi
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched
+if $cygwin ; then
+ [ -n "$JAVA_HOME" ] &&
+ JAVA_HOME=$(cygpath --unix "$JAVA_HOME")
+ [ -n "$CLASSPATH" ] &&
+ CLASSPATH=$(cygpath --path --unix "$CLASSPATH")
+fi
+
+# For Mingw, ensure paths are in UNIX format before anything is touched
+if $mingw ; then
+ [ -n "$JAVA_HOME" ] && [ -d "$JAVA_HOME" ] &&
+ JAVA_HOME="$(cd "$JAVA_HOME" || (echo_r "cannot cd into $JAVA_HOME."; exit
1); pwd)"
+fi
+
+if [ -z "$JAVA_HOME" ]; then
+ javaExecutable="$(which javac)"
+ if [ -n "$javaExecutable" ] && ! [ "$(expr "\"$javaExecutable\"" : '\([^
]*\)')" = "no" ]; then
+ # readlink(1) is not available as standard on Solaris 10.
+ readLink=$(which readlink)
+ if [ ! "$(expr "$readLink" : '\([^ ]*\)')" = "no" ]; then
+ if $darwin ; then
+ javaHome="$(dirname "\"$javaExecutable\"")"
+ javaExecutable="$(cd "\"$javaHome\"" && pwd -P)/javac"
+ else
+ javaExecutable="$(readlink -f "\"$javaExecutable\"")"
+ fi
+ javaHome="$(dirname "\"$javaExecutable\"")"
+ javaHome=$(expr "$javaHome" : '\(.*\)/bin')
+ JAVA_HOME="$javaHome"
+ export JAVA_HOME
+ fi
+ fi
+fi
+
+if [ -z "$JAVACMD" ] ; then
+ if [ -n "$JAVA_HOME" ] ; then
+ if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+ # IBM's JDK on AIX uses strange locations for the executables
+ JAVACMD="$JAVA_HOME/jre/sh/java"
+ else
+ JAVACMD="$JAVA_HOME/bin/java"
+ fi
+ else
+ JAVACMD="$(\unset -f command 2>/dev/null; \command -v java)"
+ fi
+fi
+
+if [ ! -x "$JAVACMD" ] ; then
+ echo_r "Error: JAVA_HOME is not defined correctly." >&2
+ echo_r " We cannot execute $JAVACMD" >&2
+ exit 1
+fi
+
+if [ -z "$JAVA_HOME" ] ; then
+ echo_r "Warning: JAVA_HOME environment variable is not set."
+fi
+
+_RUNJAVA="$JAVA_HOME/bin/java"
+
+# resolve links - $0 may be a softlink
+PRG="$0"
+
+while [[ -h "$PRG" ]]; do
+ # shellcheck disable=SC2006
+ ls=`ls -ld "$PRG"`
+ # shellcheck disable=SC2006
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ # shellcheck disable=SC2006
+ PRG=`dirname "$PRG"`/"$link"
+ fi
+done
+
+# Get standard environment variables
+# shellcheck disable=SC2006
+PRG_DIR=`dirname "$PRG"`
+WORK_DIR=$(cd "$PRG_DIR" >/dev/null || exit; pwd)
+
+SP_VERSION="2.1.5"
+SP_NAME="apache-streampark_2.12-${SP_VERSION}-incubating-bin"
+SP_TAR="${SP_NAME}.tar.gz"
+SP_URL="https://archive.apache.org/dist/incubator/streampark/${SP_VERSION}/${SP_TAR}"
+SP_HOME="${WORK_DIR}"/"${SP_NAME}"
+SP_PATH="${WORK_DIR}"/"${SP_TAR}"
+SP_CONFIG="${SP_HOME}/conf/config.yaml"
+
+download() {
+ local url=$1
+ local name=$2
+ local path=$3
+ if command -v wget > /dev/null; then
+ wget "$url" -O "$path" || rm -f "$path"
+ # shellcheck disable=SC2181
+ if [[ $? -ne 0 ]]; then
+ echo_r "download $name failed, please try again."
+ exit 1
+ fi
+ elif command -v curl > /dev/null; then
+ curl -o "$path" "$url" -f -L || rm -f "$path"
+ # shellcheck disable=SC2181
+ if [[ $? -ne 0 ]]; then
+ echo_r "download $name failed, please try again."
+ exit 1
+ fi
+ else
+ echo "
+ import java.io.InputStream;
+ import java.net.URL;
+ import java.nio.file.Files;
+ import java.nio.file.Path;
+ import java.nio.file.Paths;
+ import java.nio.file.StandardCopyOption;
+
+ public class Downloader {
+ public static void main(String[] args) {
+ try {
+ URL url = new URL(args[0]);
+ Path path = Paths.get(args[1]).toAbsolutePath().normalize();
+ try (InputStream inStream = url.openStream()) {
+ Files.copy(inStream, path, StandardCopyOption.REPLACE_EXISTING);
+ }
+ } catch (Exception e) {
+ System.exit(1);
+ }
+ }
+ }" > "${WORK_DIR}"/Downloader.java
+
+ "$JAVA_HOME/bin/javac" "${WORK_DIR}"/Downloader.java && rm -f
"${WORK_DIR}"/Downloader.java
+
+ "$JAVA_HOME/bin/java" -cp "${WORK_DIR}" Downloader "$url" "$path" && rm -f
"${WORK_DIR}"/Downloader.class
+
+ if [[ $? -ne 0 ]]; then
+ echo_r "download $name failed, please try again."
+ exit 1
+ fi
+ fi
+}
+
+BASH_UTIL="org.apache.streampark.console.base.util.BashJavaUtils"
+
+# 1). download streampark.
+echo_g "download streampark..."
+
+download "$SP_URL" "$SP_TAR" "$SP_PATH"
+tar -xvf "${SP_TAR}" >/dev/null 2>&1 \
+ && rm -r "${SP_TAR}" \
+ && mkdir "${SP_HOME}"/flink \
+ && mkdir "${SP_HOME}"/workspace
+
+# 1.1) workspace
+$_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --replace "$SP_CONFIG" "local:
||local: ${SP_HOME}/workspace #"
+
+# 1.2) port.
+SP_PORT=$($_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --free_port "10000")
+$_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --replace "$SP_CONFIG" "port:
10000||port: ${SP_PORT}"
+
+# 2). flink
+# shellcheck disable=SC2009
+FLINK_PROCESS="$(ps -ef | grep "flink-dist-" | grep
'org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint')"
+if [[ -n "${FLINK_PROCESS}" ]]; then
+ FLINK_PARAM=$($_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --read_flink
"$FLINK_PROCESS")
+ IFS=',' read -r -a ARRAY <<< "$FLINK_PARAM"
+ FLINK_HOME=${ARRAY[0]}
+ FLINK_NAME=${ARRAY[1]}
+ FLINK_PORT=${ARRAY[2]}
+else
+ FLINK_NAME="flink-1.19.0"
+
FLINK_URL="https://archive.apache.org/dist/flink/${FLINK_NAME}/${FLINK_NAME}-bin-scala_2.12.tgz"
+ FLINK_TAR="${FLINK_NAME}-bin-scala_2.12.tgz"
+ FLINK_HOME="${WORK_DIR}"/${SP_NAME}/flink/${FLINK_NAME}
+ FLINK_PATH="${WORK_DIR}"/"${FLINK_TAR}"
+ FLINK_CONF="${FLINK_HOME}/conf/config.yaml"
+
+ # 1) download flink
+ echo_g "download flink..."
+ download "$FLINK_URL" "$FLINK_TAR" "$FLINK_PATH"
+ tar -xvf "${FLINK_TAR}" >/dev/null 2>&1 \
+ && rm -r "${FLINK_TAR}" \
+ && mv "$FLINK_NAME" "${WORK_DIR}"/"${SP_NAME}"/flink
+
+ # 2) start flink-cluster
+ FLINK_PORT=$($_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --free_port "8081")
+ $_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --replace "$SP_CONFIG" "# port:
8081||port: ${FLINK_PORT}"
+
+ bash +x "${FLINK_HOME}"/bin/start-cluster.sh
+fi
+
+# 3) start streampark
+bash +x "${SP_HOME}"/bin/startup.sh \
+ --quickstart flink_home="$FLINK_HOME" \
+ --quickstart flink_port="$FLINK_PORT" \
+ --quickstart flink_name="quickstart-$FLINK_NAME"