This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
new b1e7d75eb [Feature] streampark quick install script support (#3748)
b1e7d75eb is described below
commit b1e7d75ebca72178c35775e257c10105ef93f7dd
Author: benjobs <[email protected]>
AuthorDate: Fri Jun 14 09:06:23 2024 +0800
[Feature] streampark quick install script support (#3748)
---
README.md | 45 ++--
docker/README.md | 18 +-
.../streampark/common/util/PropertiesUtils.scala | 82 ++++--
.../src/main/assembly/bin/streampark.sh | 56 ++--
.../console/base/util/BashJavaUtils.java | 90 ++++++-
.../ApplicationBuildPipelineController.java | 52 +---
.../core/controller/FlinkClusterController.java | 5 +-
.../console/core/entity/Application.java | 2 +-
.../console/core/runner/QuickStartRunner.java | 103 ++++++++
.../console/core/service/ApplicationService.java | 5 +-
.../console/core/service/FlinkClusterService.java | 2 +-
.../core/service/impl/ApplicationServiceImpl.java | 50 ++++
.../core/service/impl/FlinkClusterServiceImpl.java | 4 +-
.../console/system/runner/StartedUpRunner.java | 19 +-
streampark.sh | 282 +++++++++++++++++++++
15 files changed, 679 insertions(+), 136 deletions(-)
diff --git a/README.md b/README.md
index 415a33117..683b9c874 100644
--- a/README.md
+++ b/README.md
@@ -26,14 +26,13 @@
<div align="center">
-[](https://www.apache.org/licenses/LICENSE-2.0.html)
-[](https://github.com/apache/incubator-streampark/stargazers)
+[](https://github.com/apache/incubator-streampark/stargazers)
[](https://github.com/apache/incubator-streampark/releases)
-[](https://streampark.apache.org/download)
-[](https://twitter.com/ASFStreamPark)
+[](https://streampark.apache.org/download)
+[](https://twitter.com/ASFStreamPark)
**[Website](https://streampark.apache.org)** |
-**[Document](https://streampark.apache.org/docs/intro)** |
+**[Official
Documentation](https://streampark.apache.org/docs/get-started/intro)** |
**[FAQ](https://github.com/apache/incubator-streampark/issues/507)**

@@ -41,17 +40,15 @@
</div>
-## π What is StreamPark?
+## π Abstract
-<h4>StreamPark is a stream processing development framework and professional
management platform. </h4>
+----
+<h4>Apache StreamPark is a stream processing development framework and
professional management platform. </h4>
-> StreamPark is a streaming application development framework. Aimed at ease
building and managing streaming applications, StreamPark provides development
framework for writing stream processing application with Apache Flink and
Apache Spark, More other engines will be supported in the future. Also,
StreamPark is a professional management platform for streaming application
-, including application development, debugging, interactive query, deployment,
operation, maintenance, etc. It was initially known as StreamX and renamed to
StreamPark in August 2022.
+> Apache StreamPark is a streaming application development framework. Aimed at
ease building and managing streaming applications, StreamPark provides
development framework for writing stream processing application with Apache
Flink and Apache Spark, More other engines will be supported in the future.
Also, StreamPark is a professional management platform for streaming
application, including application development, debugging, interactive query,
deployment, operation, maintenance, etc. It [...]
-## π Features
-
-* Apache Flink & Spark application development scaffold
-* Support multiple versions of Flink & Spark
+* Apache Flink & Apache Spark application development scaffold
+* Support multiple versions of Apache Flink & Apache Spark
* Wide range of out-of-the-box connectors
* One-stop stream processing operation platform
* Support catalogγolapγstreaming-warehouse etc.
@@ -60,10 +57,11 @@

## π QuickStart
+
- [Start with Docker](docker/README.md)
- [Start with Kubernetes](helm/README.md)
-Click [Document](https://streampark.apache.org/docs/user-guide/quick-start)
for more information
+Click [Official
Documentation](https://streampark.apache.org/docs/framework/quick-start) for
more information
## π¨ How to Build
@@ -72,10 +70,12 @@ git clone [email protected]:apache/incubator-streampark.git
cd incubator-streampark
./build.sh
```
-π Details:how to
[Development](https://streampark.apache.org/docs/user-guide/development)
+
+π Details:how to
[Development](https://streampark.apache.org/docs/development/development)
+
## π§βπ» Downloads
-Download address for run-directly software package :
[https://streampark.apache.org/download](https://streampark.apache.org/download)
+Download address for run-directly software package:
https://streampark.apache.org/download
## π Our users
@@ -89,21 +89,28 @@ Various companies and organizations use StreamPark for
research, production and
### π Submit Pull Request and Issues
-You can submit any ideas as [pull
requests](https://github.com/apache/incubator-streampark/pulls) or as [GitHub
issues](https://github.com/apache/incubator-streampark/issues/new/choose).
+You can submit any ideas as [pull
requests](https://github.com/apache/incubator-streampark/pulls) or as
[issues](https://github.com/apache/incubator-streampark/issues/new/choose).
> If you're new to posting issues, we ask that you read [*How To Ask Questions
> The Smart Way*](http://www.catb.org/~esr/faqs/smart-questions.html) (**This
> guide does not provide actual support services for this project!**), [How to
> Report Bugs
> Effectively](http://www.chiark.greenend.org.uk/~sgtatham/bugs.html) prior to
> posting. Well written bug reports help us help you!
### π» How to Contribute
-We welcome your suggestions, comments (including criticisms), comments and
contributions. See π[How to
Contribute](https://streampark.apache.org/community/submit_guide/submit_code)
and π[Code Submission
Guide](https://streampark.apache.org/community/submit_guide/submit_code)
+We welcome your suggestions, comments (including criticisms), comments and
contributions. See [How to
Contribute](https://streampark.apache.org/community/submit_guide/submit_code)
and [Code Submission
Guide](https://streampark.apache.org/community/submit_guide/code_style_and_quality_guide)
+
+### π€ Subscribe Mailing Lists
+Mail List is the most recognized form of communication in Apache community.
See how to [Join the Mailing
Lists](https://streampark.apache.org/community/contribution_guide/mailing_lists)
Thank you to all the people who already contributed to StreamPark!
[](https://github.com/apache/incubator-streampark/graphs/contributors)
+## License
+
+Licensed under the [Apache License, Version 2.0](LICENSE)
+
## π¬ Social Media
-- [Twitter](https://twitter.com/ASFStreamPark)
+- [X (Twitter)](https://twitter.com/ASFStreamPark)
- [Zhihu](https://www.zhihu.com/people/streampark) (in Chinese)
- [bilibili](https://space.bilibili.com/455330087) (in Chinese)
- WeChat Official Account (in Chinese, scan the QR code to follow)
diff --git a/docker/README.md b/docker/README.md
index 486bb9849..9b2b114e7 100644
--- a/docker/README.md
+++ b/docker/README.md
@@ -1,13 +1,23 @@
-# Deploy StreamPark on Docker
-### 1. docker-compose up
+### 1. Confirm release version
+When releasing the new version, the release manager will verify the image tag
and push the image to the image repository,
+The latest image tag will be written to
[docker-compose.yaml](./docker-compose.yaml)οΌusers also can independently
verify whether the version of the StreamPark image in the
[docker-compose.yaml](./docker-compose.yaml) file is correct (If the current
branch has not been released, the image tag is the last release image tag):
+
+```yaml
+version: '3.8'
+services:
+ streampark-console:
+ image: apache/streampark:2.2.0
+```
+
+### 2. docker-compose up
```shell
docker-compose up -d
```
-### 2. open in browser
+### 3. open in browser
http://localhost:10000
-#### [more
detail](https://streampark.apache.org/docs/user-guide/docker-deployment)
+#### [more
detail](https://streampark.apache.org/docs/get-started/docker-deployment)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index 03aa13145..6adc03cf7 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -31,7 +31,6 @@ import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
-import scala.util.Try
object PropertiesUtils extends Logger {
@@ -308,39 +307,76 @@ object PropertiesUtils extends Logger {
@Nonnull def extractArguments(args: String): List[String] = {
val programArgs = new ArrayBuffer[String]()
if (StringUtils.isNotEmpty(args)) {
- val array = args.split("\\s+")
- val iter = array.iterator
- while (iter.hasNext) {
- val v = iter.next()
- val p = v.take(1)
- p match {
- case "'" | "\"" =>
- var value = v
- if (!v.endsWith(p)) {
- while (!value.endsWith(p) && iter.hasNext) {
- value += s" ${iter.next()}"
- }
+ return extractArguments(args.split("\\s+"))
+ }
+ programArgs.toList
+ }
+
+ def extractArguments(array: Array[String]): List[String] = {
+ val programArgs = new ArrayBuffer[String]()
+ val iter = array.iterator
+ while (iter.hasNext) {
+ val v = iter.next()
+ val p = v.take(1)
+ p match {
+ case "'" | "\"" =>
+ var value = v
+ if (!v.endsWith(p)) {
+ while (!value.endsWith(p) && iter.hasNext) {
+ value += s" ${iter.next()}"
}
- programArgs += value.substring(1, value.length - 1)
- case _ =>
- val regexp = "(.*)='(.*)'$"
+ }
+ programArgs += value.substring(1, value.length - 1)
+ case _ =>
+ val regexp = "(.*)='(.*)'$"
+ if (v.matches(regexp)) {
+ programArgs += v.replaceAll(regexp, "$1=$2")
+ } else {
+ val regexp = "(.*)=\"(.*)\"$"
if (v.matches(regexp)) {
programArgs += v.replaceAll(regexp, "$1=$2")
} else {
- val regexp = "(.*)=\"(.*)\"$"
- if (v.matches(regexp)) {
- programArgs += v.replaceAll(regexp, "$1=$2")
- } else {
- programArgs += v
- }
+ programArgs += v
}
- }
+ }
}
}
programArgs.toList
}
+ def extractMultipleArguments(array: Array[String]): Map[String, Map[String,
String]] = {
+ val iter = array.iterator
+ val map = mutable.Map[String, mutable.Map[String, String]]()
+ while (iter.hasNext) {
+ val v = iter.next()
+ v.take(2) match {
+ case "--" =>
+ val kv = iter.next()
+ val regexp = "(.*)=(.*)"
+ if (kv.matches(regexp)) {
+ val values = kv.split("=")
+ val k1 = values(0).trim
+ val v1 = values(1).replaceAll("^['|\"]|['|\"]$", "")
+ val k = v.drop(2)
+ map.get(k) match {
+ case Some(m) => m += k1 -> v1
+ case _ => map += k -> mutable.Map(k1 -> v1)
+ }
+ }
+ case _ =>
+ }
+ }
+ map.map(x => x._1 -> x._2.toMap).toMap
+ }
+
@Nonnull def extractDynamicPropertiesAsJava(properties: String):
JavaMap[String, String] =
new JavaMap[String, String](extractDynamicProperties(properties).asJava)
+ @Nonnull def extractMultipleArgumentsAsJava(
+ args: Array[String]): JavaMap[String, JavaMap[String, String]] = {
+ val map =
+ extractMultipleArguments(args).map(c => c._1 -> new JavaMap[String,
String](c._2.asJava))
+ new JavaMap[String, JavaMap[String, String]](map.asJava)
+ }
+
}
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
index 81af07693..5e0e550ce 100755
---
a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
+++
b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
@@ -241,10 +241,23 @@ if [[ "$USE_NOHUP" = "true" ]]; then
NOHUP="nohup"
fi
-BASH_UTIL="org.apache.streampark.console.base.util.BashJavaUtils"
+CONFIG="${APP_CONF}/application.yml"
+# shellcheck disable=SC2006
+if [[ -f "$CONFIG" ]] ; then
+ echo_y """[WARN] in the \"conf\" directory, found the \"application.yml\"
file. The \"application.yml\" file is deprecated.
+ For compatibility, this application.yml will be used preferentially. The
latest configuration file is \"config.yaml\". It is recommended to use
\"config.yaml\".
+ Note: \"application.yml\" will be completely deprecated in version 2.2.0.
"""
+else
+ CONFIG="${APP_CONF}/config.yaml"
+ if [[ ! -f "$CONFIG" ]] ; then
+ echo_r "can not found config.yaml in \"conf\" directory, please check."
+ exit 1;
+ fi
+fi
+BASH_UTIL="org.apache.streampark.console.base.util.BashJavaUtils"
APP_MAIN="org.apache.streampark.console.StreamParkConsoleBootstrap"
-
+SERVER_PORT=$($_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --get_yaml "server.port"
"$CONFIG")
JVM_OPTS_FILE=${APP_HOME}/bin/jvm_opts.sh
JVM_ARGS=""
@@ -276,21 +289,8 @@ print_logo() {
printf ' %s WebSite: https://streampark.apache.org%s\n'
$BLUE $RESET
printf ' %s GitHub : http://github.com/apache/streampark%s\n\n'
$BLUE $RESET
printf ' %s ββββββββ Apache StreamPark, Make stream processing easier
Γ΄~Γ΄!%s\n\n' $PRIMARY $RESET
-}
-
-init_env() {
- # shellcheck disable=SC2006
- CONFIG="${APP_CONF}/application.yml"
- if [[ -f "$CONFIG" ]] ; then
- echo_y """[WARN] in the \"conf\" directory, found the \"application.yml\"
file. The \"application.yml\" file is deprecated.
- For compatibility, this application.yml will be used preferentially.
The latest configuration file is \"config.yaml\". It is recommended to use
\"config.yaml\".
- Note: \"application.yml\" will be completely deprecated in version
2.2.0. """
- else
- CONFIG="${APP_CONF}/config.yaml"
- if [[ ! -f "$CONFIG" ]] ; then
- echo_r "can not found config.yaml in \"conf\" directory, please check."
- exit 1;
- fi
+ if [[ "$1"x == "start"x ]]; then
+ printf ' %s http://localhost:%s %s\n\n'
$PRIMARY $SERVER_PORT $RESET
fi
}
@@ -313,19 +313,19 @@ get_pid() {
fi
# shellcheck disable=SC2006
- local serverPort=`$_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --get_yaml
"server.port" "$CONFIG"`
- if [[ x"${serverPort}" == x"" ]]; then
+ if [[ "${SERVER_PORT}"x == ""x ]]; then
echo_r "server.port is required, please check $CONFIG"
exit 1;
else
# shellcheck disable=SC2006
# shellcheck disable=SC2155
- local used=`$_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --check_port
"$serverPort"`
- if [[ x"${used}" == x"used" ]]; then
+ local used=`$_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --check_port
"$SERVER_PORT"`
+ if [[ "${used}"x == "used"x ]]; then
# shellcheck disable=SC2006
local PID=`jps -l | grep "$APP_MAIN" | awk '{print $1}'`
+ # shellcheck disable=SC2236
if [[ ! -z $PID ]]; then
- echo $PID
+ echo "$PID"
else
echo 0
fi
@@ -411,7 +411,7 @@ start() {
-Dapp.home="${APP_HOME}" \
-Dlogging.config="${APP_CONF}/logback-spring.xml" \
-Djava.io.tmpdir="$APP_TMPDIR" \
- $APP_MAIN >> "$APP_OUT" 2>&1 "&"
+ $APP_MAIN "$@" >> "$APP_OUT" 2>&1 "&"
local PID=$!
local IS_NUMBER="^[0-9]+$"
@@ -565,27 +565,31 @@ restart() {
}
main() {
- print_logo
- init_env
case "$1" in
"debug")
DEBUG_PORT=$2
debug
;;
"start")
- start
+ shift
+ start "$@"
+ [[ $? -eq 0 ]] && print_logo "start"
;;
"start_docker")
+ print_logo
start_docker
;;
"stop")
+ print_logo
stop
;;
"status")
+ print_logo
status
;;
"restart")
restart
+ [[ $? -eq 0 ]] && print_logo "start"
;;
*)
echo_r "Unknown command: $1"
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java
index faf8b8ead..999167fde 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java
@@ -17,15 +17,30 @@
package org.apache.streampark.console.base.util;
+import org.apache.streampark.common.conf.FlinkVersion;
+import org.apache.streampark.common.util.FileUtils;
import org.apache.streampark.common.util.PropertiesUtils;
+import org.apache.commons.io.output.NullOutputStream;
+
+import java.io.File;
+import java.io.FileWriter;
import java.io.IOException;
-import java.net.ServerSocket;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.net.Socket;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
import java.util.Arrays;
import java.util.Map;
public class BashJavaUtils {
+ private static String localhost = "localhost";
+
public static void main(String[] args) throws IOException {
String action = args[0].toLowerCase();
String[] actionArgs = Arrays.copyOfRange(args, 1, args.length);
@@ -39,12 +54,77 @@ public class BashJavaUtils {
System.out.println(value);
break;
case "--check_port":
- Integer port = Integer.parseInt(actionArgs[0]);
- try {
- new ServerSocket(port);
+ int port = Integer.parseInt(actionArgs[0]);
+ try (Socket ignored = new Socket(localhost, port)) {
+ System.out.println("used");
+ } catch (Exception e) {
System.out.println("free");
+ }
+ break;
+ case "--free_port":
+ int start = Integer.parseInt(actionArgs[0]);
+ for (port = start; port < 65535; port++) {
+ try (Socket ignored = new Socket(localhost, port)) {
+ } catch (Exception e) {
+ System.out.println(port);
+ break;
+ }
+ }
+ break;
+ case "--read_flink":
+ String input = actionArgs[0];
+ String[] inputs = input.split(":");
+ String flinkDist =
+ Arrays.stream(inputs).filter(c ->
c.contains("flink-dist-")).findFirst().get();
+ File flinkHome = new File(flinkDist.replaceAll("/lib/.*", ""));
+ FlinkVersion flinkVersion = new
FlinkVersion(flinkHome.getAbsolutePath());
+
+ PrintStream originalOut = System.out;
+ System.setOut(new PrintStream(new NullOutputStream()));
+
+ String version = flinkVersion.majorVersion();
+ float ver = Float.parseFloat(version);
+ File yaml =
+ new File(flinkHome, ver < 1.19f ? "/conf/flink-conf.yaml" :
"/conf/config.yaml");
+
+ Map<String, String> config =
PropertiesUtils.fromYamlFileAsJava(yaml.getAbsolutePath());
+ String flinkPort = config.getOrDefault("rest.port", "8081");
+ System.setOut(originalOut);
+ System.out.println(
+ flinkHome
+ .getAbsolutePath()
+ .concat(",")
+ .concat(flinkHome.getName())
+ .concat(",")
+ .concat(flinkPort));
+ break;
+ case "--replace":
+ String filePath = actionArgs[0];
+ String[] text = actionArgs[1].split("\\|\\|");
+ String searchText = text[0];
+ String replaceText = text[1];
+ try {
+ File file = new File(filePath);
+ String content = FileUtils.readString(file);
+ content = content.replace(searchText, replaceText);
+ FileWriter writer = new FileWriter(filePath);
+ writer.write(content);
+ writer.flush();
+ writer.close();
+ System.exit(0);
+ } catch (IOException e) {
+ System.exit(1);
+ }
+ break;
+ case "--download":
+ try {
+ URL url = new URL(actionArgs[0]);
+ Path path = Paths.get(actionArgs[1]).toAbsolutePath().normalize();
+ try (InputStream inStream = url.openStream()) {
+ Files.copy(inStream, path, StandardCopyOption.REPLACE_EXISTING);
+ }
} catch (Exception e) {
- System.out.println("used");
+ System.exit(1);
}
break;
default:
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
index b86a63cfb..5b2361baa 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
@@ -18,13 +18,9 @@
package org.apache.streampark.console.core.controller;
import org.apache.streampark.console.base.domain.RestResponse;
-import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.core.annotation.PermissionScope;
import org.apache.streampark.console.core.bean.AppBuildDockerResolvedDetail;
import org.apache.streampark.console.core.entity.AppBuildPipeline;
-import org.apache.streampark.console.core.entity.Application;
-import org.apache.streampark.console.core.entity.ApplicationLog;
-import org.apache.streampark.console.core.entity.FlinkEnv;
import org.apache.streampark.console.core.service.AppBuildPipeService;
import org.apache.streampark.console.core.service.ApplicationLogService;
import org.apache.streampark.console.core.service.ApplicationService;
@@ -42,7 +38,6 @@ import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
-import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -74,52 +69,7 @@ public class ApplicationBuildPipelineController {
@PostMapping(value = "build")
@RequiresPermissions("app:create")
public RestResponse buildApplication(Long appId, boolean forceBuild) throws
Exception {
- Application app = applicationService.getById(appId);
-
- ApiAlertException.throwIfNull(
- app.getVersionId(), "Please bind a Flink version to the current flink
job.");
- // 1) check flink version
- FlinkEnv env = flinkEnvService.getById(app.getVersionId());
- boolean checkVersion = env.getFlinkVersion().checkVersion(false);
- if (!checkVersion) {
- throw new ApiAlertException("Unsupported flink version: " +
env.getFlinkVersion().version());
- }
-
- // 2) check env
- boolean envOk = applicationService.checkEnv(app);
- if (!envOk) {
- throw new ApiAlertException(
- "Check flink env failed, please check the flink version of this
job");
- }
-
- if (!forceBuild && !appBuildPipeService.allowToBuildNow(appId)) {
- throw new ApiAlertException(
- "The job is invalid, or the job cannot be built while it is
running");
- }
- // check if you need to go through the build process (if the jar and pom
have changed,
- // you need to go through the build process, if other common parameters
are modified,
- // you don't need to go through the build process)
-
- ApplicationLog applicationLog = new ApplicationLog();
- applicationLog.setOptionName(
- org.apache.streampark.console.core.enums.Operation.RELEASE.getValue());
- applicationLog.setAppId(app.getId());
- applicationLog.setOptionTime(new Date());
-
- boolean needBuild = applicationService.checkBuildAndUpdate(app);
- if (!needBuild) {
- applicationLog.setSuccess(true);
- applicationLogService.save(applicationLog);
- return RestResponse.success(true);
- }
-
- // rollback
- if (app.isNeedRollback() && app.isFlinkSqlJob()) {
- flinkSqlService.rollback(app);
- }
-
- boolean actionResult = appBuildPipeService.buildApplication(app,
applicationLog);
- return RestResponse.success(actionResult);
+ return applicationService.buildApplication(appId, forceBuild);
}
/**
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
index 04c4386ba..99b92b6ae 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
@@ -22,6 +22,7 @@ import
org.apache.streampark.console.base.exception.InternalException;
import org.apache.streampark.console.core.bean.ResponseResult;
import org.apache.streampark.console.core.entity.FlinkCluster;
import org.apache.streampark.console.core.service.FlinkClusterService;
+import org.apache.streampark.console.core.service.ServiceHelper;
import org.apache.shiro.authz.annotation.RequiresPermissions;
@@ -42,6 +43,8 @@ public class FlinkClusterController {
@Autowired private FlinkClusterService flinkClusterService;
+ @Autowired private ServiceHelper serviceHelper;
+
@PostMapping("list")
public RestResponse list() {
List<FlinkCluster> flinkClusters = flinkClusterService.listCluster();
@@ -63,7 +66,7 @@ public class FlinkClusterController {
@PostMapping("create")
@RequiresPermissions("cluster:create")
public RestResponse create(FlinkCluster cluster) {
- Boolean success = flinkClusterService.create(cluster);
+ Boolean success = flinkClusterService.create(cluster,
serviceHelper.getUserId());
return RestResponse.success(success);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 88db35fa1..f43058d9e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -456,7 +456,7 @@ public class Application implements Serializable {
@SuppressWarnings("unchecked")
public Map<String, Object> getOptionMap() {
if (StringUtils.isBlank(this.options)) {
- return Collections.emptyMap();
+ return new HashMap<>();
}
Map<String, Object> map = JacksonUtils.read(this.options, Map.class);
map.entrySet().removeIf(entry -> entry.getValue() == null);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java
new file mode 100644
index 000000000..34efdf079
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.runner;
+
+import org.apache.streampark.common.enums.ClusterState;
+import org.apache.streampark.common.enums.ExecutionMode;
+import org.apache.streampark.common.util.PropertiesUtils;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.console.core.entity.FlinkEnv;
+import org.apache.streampark.console.core.entity.FlinkSql;
+import org.apache.streampark.console.core.service.ApplicationService;
+import org.apache.streampark.console.core.service.FlinkClusterService;
+import org.apache.streampark.console.core.service.FlinkEnvService;
+import org.apache.streampark.console.core.service.FlinkSqlService;
+import org.apache.streampark.console.system.service.UserService;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Order
+@Slf4j
+@Component
+public class QuickStartRunner implements ApplicationRunner {
+
+ @Autowired private FlinkEnvService flinkEnvService;
+
+ @Autowired private ApplicationService applicationService;
+
+ @Autowired private FlinkClusterService flinkClusterService;
+
+ @Autowired private FlinkSqlService flinkSqlService;
+
+ @Autowired private UserService userService;
+
+ private static Long defaultId = 100000L;
+
+ @Override
+ public void run(ApplicationArguments args) throws Exception {
+ Map<String, HashMap<String, String>> map =
+ PropertiesUtils.extractMultipleArgumentsAsJava(args.getSourceArgs());
+ Map<String, String> quickstart = map.get("quickstart");
+
+ if (quickstart != null && quickstart.size() == 3) {
+
+ userService.setLastTeam(defaultId, defaultId);
+
+ // 1) create flinkEnv
+ FlinkEnv flinkEnv = new FlinkEnv();
+ flinkEnv.setFlinkName(quickstart.get("flink_name"));
+ flinkEnv.setFlinkHome(quickstart.get("flink_home"));
+ flinkEnvService.create(flinkEnv);
+
+ // 2) create flinkCluster
+ FlinkCluster flinkCluster = new FlinkCluster();
+ flinkCluster.setClusterName("quickstart");
+ flinkCluster.setVersionId(flinkEnv.getId());
+ flinkCluster.setClusterState(ClusterState.STARTED.getValue());
+ flinkCluster.setExecutionMode(ExecutionMode.REMOTE.getMode());
+ flinkCluster.setAddress("http://localhost:" +
quickstart.get("flink_port"));
+ flinkClusterService.create(flinkCluster, defaultId);
+
+ // 3) set flink version and cluster
+ Application app = new Application();
+ app.setId(defaultId);
+ Application application = applicationService.getApp(app);
+ application.setFlinkClusterId(flinkCluster.getId());
+ application.setVersionId(flinkEnv.getId());
+ application.setExecutionMode(ExecutionMode.REMOTE.getMode());
+
+ FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(),
true);
+ application.setFlinkSql(flinkSql.getSql());
+
+ boolean success = applicationService.update(application);
+ if (success) {
+ // 4) build application
+ applicationService.buildApplication(defaultId, false);
+ }
+ }
+ }
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
index b9ce6d768..4d37ce836 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
@@ -19,6 +19,7 @@ package org.apache.streampark.console.core.service;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.console.base.domain.RestRequest;
+import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.base.exception.ApplicationException;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.enums.AppExistsState;
@@ -67,7 +68,7 @@ public interface ApplicationService extends
IService<Application> {
String readConf(String config) throws IOException;
- Application getApp(Application app);
+ Application getApp(Application application);
String getMain(Application application);
@@ -128,4 +129,6 @@ public interface ApplicationService extends
IService<Application> {
AppExistsState checkStart(Application app);
List<ApplicationReport> getYARNApplication(String appName);
+
+ RestResponse buildApplication(Long appId, boolean forceBuild) throws
Exception;
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
index d82387d04..a96cd4e16 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
@@ -30,7 +30,7 @@ public interface FlinkClusterService extends
IService<FlinkCluster> {
ResponseResult check(FlinkCluster flinkCluster);
- Boolean create(FlinkCluster flinkCluster);
+ Boolean create(FlinkCluster flinkCluster, Long userId);
void delete(Long id);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index dcdae2437..6e4f6e7ad 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -34,6 +34,7 @@ import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.common.util.YarnUtils;
import org.apache.streampark.console.base.domain.RestRequest;
+import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.exception.ApiDetailException;
import org.apache.streampark.console.base.exception.ApplicationException;
@@ -1948,6 +1949,55 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
}
}
+ @Override
+ public RestResponse buildApplication(Long appId, boolean forceBuild) throws
Exception {
+ Application app = this.getById(appId);
+
+ ApiAlertException.throwIfNull(
+ app.getVersionId(), "Please bind a Flink version to the current flink
job.");
+ // 1) check flink version
+ FlinkEnv env = flinkEnvService.getById(app.getVersionId());
+ boolean checkVersion = env.getFlinkVersion().checkVersion(false);
+ if (!checkVersion) {
+ throw new ApiAlertException("Unsupported flink version: " +
env.getFlinkVersion().version());
+ }
+
+ // 2) check env
+ boolean envOk = this.checkEnv(app);
+ if (!envOk) {
+ throw new ApiAlertException(
+ "Check flink env failed, please check the flink version of this
job");
+ }
+
+ if (!forceBuild && !appBuildPipeService.allowToBuildNow(appId)) {
+ throw new ApiAlertException(
+ "The job is invalid, or the job cannot be built while it is
running");
+ }
+ // check if you need to go through the build process (if the jar and pom
have changed,
+ // you need to go through the build process, if other common parameters
are modified,
+ // you don't need to go through the build process)
+
+ ApplicationLog applicationLog = new ApplicationLog();
+ applicationLog.setOptionName(
+ org.apache.streampark.console.core.enums.Operation.RELEASE.getValue());
+ applicationLog.setAppId(app.getId());
+ applicationLog.setOptionTime(new Date());
+
+ boolean needBuild = this.checkBuildAndUpdate(app);
+ if (!needBuild) {
+ applicationLog.setSuccess(true);
+ applicationLogService.save(applicationLog);
+ return RestResponse.success(true);
+ }
+
+ // rollback
+ if (app.isNeedRollback() && app.isFlinkSqlJob()) {
+ flinkSqlService.rollback(app);
+ }
+ boolean actionResult = appBuildPipeService.buildApplication(app,
applicationLog);
+ return RestResponse.success(actionResult);
+ }
+
private Tuple2<String, String> getNamespaceClusterId(Application
application) {
String clusterId = null;
String k8sNamespace = null;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index efc2d295a..1cc9a82f4 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -139,8 +139,8 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
}
@Override
- public Boolean create(FlinkCluster flinkCluster) {
- flinkCluster.setUserId(serviceHelper.getUserId());
+ public Boolean create(FlinkCluster flinkCluster, Long userId) {
+ flinkCluster.setUserId(userId);
boolean successful = validateQueueIfNeeded(flinkCluster);
ApiAlertException.throwIfFalse(
successful, String.format(ERROR_CLUSTER_QUEUE_HINT,
flinkCluster.getYarnQueue()));
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java
index 4a19b709f..93c0c5490 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java
@@ -17,7 +17,7 @@
package org.apache.streampark.console.system.runner;
-import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.util.SystemPropertyUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -27,6 +27,8 @@ import
org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
+import java.time.LocalDateTime;
+
@Order
@Slf4j
@Component
@@ -37,7 +39,20 @@ public class StartedUpRunner implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) {
if (context.isActive()) {
- ConfigConst.printLogo("streampark-console start successful");
+ String port = SystemPropertyUtils.get("server.port", "10000");
+ System.out.println("\n");
+ System.out.println(" _____ __
__ ");
+ System.out.println(" / ___// /_________ ____ _____ ___ ____
____ ______/ /__ ");
+ System.out.println(" \\__ \\/ __/ ___/ _ \\/ __ `/ __ `__ \\/ __
\\ __ `/ ___/ //_/");
+ System.out.println(" ___/ / /_/ / / __/ /_/ / / / / / / /_/ / /_/
/ / / ,< ");
+ System.out.println(" /____/\\__/_/ \\___/\\__,_/_/ /_/ /_/
____/\\__,_/_/ /_/|_| ");
+ System.out.println(" /_/
\n\n");
+ System.out.println(" Version: 2.1.5
");
+ System.out.println(" WebSite: https://streampark.apache.org
");
+ System.out.println(" GitHub :
https://github.com/apache/incubator-streampark ");
+ System.out.println(" Info : streampark-console start successful
");
+ System.out.println(" Local : http://localhost:" + port);
+ System.out.println(" Time : " + LocalDateTime.now() + "\n\n");
}
}
}
diff --git a/streampark.sh b/streampark.sh
new file mode 100755
index 000000000..c8ae6379d
--- /dev/null
+++ b/streampark.sh
@@ -0,0 +1,282 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# shellcheck disable=SC2317
+
+# Bugzilla 37848: When no TTY is available, don't output to console
+have_tty=0
+# shellcheck disable=SC2006
+if [[ "`tty`" != "not a tty" ]]; then
+ have_tty=1
+fi
+
+# Bugzilla 37848: When no TTY is available, don't output to console
+have_tty=0
+# shellcheck disable=SC2006
+if [[ "`tty`" != "not a tty" ]]; then
+ have_tty=1
+fi
+
+ # Only use colors if connected to a terminal
+if [[ ${have_tty} -eq 1 ]]; then
+ RED=$(printf '\033[31m')
+ GREEN=$(printf '\033[32m')
+ BLUE=$(printf '\033[34m')
+ RESET=$(printf '\033[0m')
+else
+ RED=""
+ GREEN=""
+ BLUE=""
+ RESET=""
+fi
+
+echo_r () {
+ # Color red: Error, Failed
+ [[ $# -ne 1 ]] && return 1
+ # shellcheck disable=SC2059
+ printf "[%sStreamPark%s] %s$1%s\n" "$BLUE" "$RESET" "$RED" "$RESET"
+}
+
+echo_g () {
+ # Color green: Success
+ [[ $# -ne 1 ]] && return 1
+ # shellcheck disable=SC2059
+ printf "[%sStreamPark%s] %s$1%s\n" "$BLUE" "$RESET" "$GREEN" "$RESET"
+}
+
+# OS specific support. $var _must_ be set to either true or false.
+cygwin=false;
+darwin=false;
+mingw=false
+case "$(uname)" in
+ CYGWIN*) cygwin=true ;;
+ MINGW*) mingw=true;;
+ Darwin*) darwin=true
+ # Use /usr/libexec/java_home if available, otherwise fall back to
/Library/Java/Home
+ # See https://developer.apple.com/library/mac/qa/qa1170/_index.html
+ if [ -z "$JAVA_HOME" ]; then
+ if [ -x "/usr/libexec/java_home" ]; then
+ JAVA_HOME="$(/usr/libexec/java_home)"; export JAVA_HOME
+ else
+ JAVA_HOME="/Library/Java/Home"; export JAVA_HOME
+ fi
+ fi
+ ;;
+esac
+
+if [ -z "$JAVA_HOME" ] ; then
+ if [ -r /etc/gentoo-release ] ; then
+ JAVA_HOME=$(java-config --jre-home)
+ fi
+fi
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched
+if $cygwin ; then
+ [ -n "$JAVA_HOME" ] &&
+ JAVA_HOME=$(cygpath --unix "$JAVA_HOME")
+ [ -n "$CLASSPATH" ] &&
+ CLASSPATH=$(cygpath --path --unix "$CLASSPATH")
+fi
+
+# For Mingw, ensure paths are in UNIX format before anything is touched
+if $mingw ; then
+ [ -n "$JAVA_HOME" ] && [ -d "$JAVA_HOME" ] &&
+ JAVA_HOME="$(cd "$JAVA_HOME" || (echo_r "cannot cd into $JAVA_HOME."; exit
1); pwd)"
+fi
+
+if [ -z "$JAVA_HOME" ]; then
+ javaExecutable="$(which javac)"
+ if [ -n "$javaExecutable" ] && ! [ "$(expr "\"$javaExecutable\"" : '\([^
]*\)')" = "no" ]; then
+ # readlink(1) is not available as standard on Solaris 10.
+ readLink=$(which readlink)
+ if [ ! "$(expr "$readLink" : '\([^ ]*\)')" = "no" ]; then
+ if $darwin ; then
+ javaHome="$(dirname "\"$javaExecutable\"")"
+ javaExecutable="$(cd "\"$javaHome\"" && pwd -P)/javac"
+ else
+ javaExecutable="$(readlink -f "\"$javaExecutable\"")"
+ fi
+ javaHome="$(dirname "\"$javaExecutable\"")"
+ javaHome=$(expr "$javaHome" : '\(.*\)/bin')
+ JAVA_HOME="$javaHome"
+ export JAVA_HOME
+ fi
+ fi
+fi
+
+if [ -z "$JAVACMD" ] ; then
+ if [ -n "$JAVA_HOME" ] ; then
+ if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+ # IBM's JDK on AIX uses strange locations for the executables
+ JAVACMD="$JAVA_HOME/jre/sh/java"
+ else
+ JAVACMD="$JAVA_HOME/bin/java"
+ fi
+ else
+ JAVACMD="$(\unset -f command 2>/dev/null; \command -v java)"
+ fi
+fi
+
+if [ ! -x "$JAVACMD" ] ; then
+ echo_r "Error: JAVA_HOME is not defined correctly." >&2
+ echo_r " We cannot execute $JAVACMD" >&2
+ exit 1
+fi
+
+if [ -z "$JAVA_HOME" ] ; then
+ echo_r "Warning: JAVA_HOME environment variable is not set."
+fi
+
+_RUNJAVA="$JAVA_HOME/bin/java"
+
+# resolve links - $0 may be a softlink
+PRG="$0"
+
+while [[ -h "$PRG" ]]; do
+ # shellcheck disable=SC2006
+ ls=`ls -ld "$PRG"`
+ # shellcheck disable=SC2006
+ link=`expr "$ls" : '.*-> \(.*\)$'`
+ if expr "$link" : '/.*' > /dev/null; then
+ PRG="$link"
+ else
+ # shellcheck disable=SC2006
+ PRG=`dirname "$PRG"`/"$link"
+ fi
+done
+
+# Get standard environment variables
+# shellcheck disable=SC2006
+PRG_DIR=`dirname "$PRG"`
+WORK_DIR=$(cd "$PRG_DIR" >/dev/null || exit; pwd)
+
+SP_VERSION="2.1.5"
+SP_NAME="apache-streampark_2.12-${SP_VERSION}-incubating-bin"
+SP_TAR="${SP_NAME}.tar.gz"
+SP_URL="https://archive.apache.org/dist/incubator/streampark/${SP_VERSION}/${SP_TAR}"
+SP_HOME="${WORK_DIR}"/"${SP_NAME}"
+SP_PATH="${WORK_DIR}"/"${SP_TAR}"
+SP_CONFIG="${SP_HOME}/conf/config.yaml"
+
+download() {
+ local url=$1
+ local name=$2
+ local path=$3
+ if command -v wget > /dev/null; then
+ wget "$url" -O "$path" || rm -f "$path"
+ # shellcheck disable=SC2181
+ if [[ $? -ne 0 ]]; then
+ echo_r "download $name failed, please try again."
+ exit 1
+ fi
+ elif command -v curl > /dev/null; then
+ curl -o "$path" "$url" -f -L || rm -f "$path"
+ # shellcheck disable=SC2181
+ if [[ $? -ne 0 ]]; then
+ echo_r "download $name failed, please try again."
+ exit 1
+ fi
+ else
+ echo "
+ import java.io.InputStream;
+ import java.net.URL;
+ import java.nio.file.Files;
+ import java.nio.file.Path;
+ import java.nio.file.Paths;
+ import java.nio.file.StandardCopyOption;
+
+ public class Downloader {
+ public static void main(String[] args) {
+ try {
+ URL url = new URL(args[0]);
+ Path path = Paths.get(args[1]).toAbsolutePath().normalize();
+ try (InputStream inStream = url.openStream()) {
+ Files.copy(inStream, path, StandardCopyOption.REPLACE_EXISTING);
+ }
+ } catch (Exception e) {
+ System.exit(1);
+ }
+ }
+ }" > "${WORK_DIR}"/Downloader.java
+
+ "$JAVA_HOME/bin/javac" "${WORK_DIR}"/Downloader.java && rm -f
"${WORK_DIR}"/Downloader.java
+
+ "$JAVA_HOME/bin/java" -cp "${WORK_DIR}" Downloader "$url" "$path" && rm -f
"${WORK_DIR}"/Downloader.class
+
+ if [[ $? -ne 0 ]]; then
+ echo_r "download $name failed, please try again."
+ exit 1
+ fi
+ fi
+}
+
+BASH_UTIL="org.apache.streampark.console.base.util.BashJavaUtils"
+
+# 1). download streampark.
+echo_g "download streampark..."
+
+download "$SP_URL" "$SP_TAR" "$SP_PATH"
+tar -xvf "${SP_TAR}" >/dev/null 2>&1 \
+ && rm -r "${SP_TAR}" \
+ && mkdir "${SP_HOME}"/flink \
+ && mkdir "${SP_HOME}"/workspace
+
+# 1.1) workspace
+$_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --replace "$SP_CONFIG" "local:
||local: ${SP_HOME}/workspace #"
+
+# 1.2) port.
+SP_PORT=$($_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --free_port "10000")
+$_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --replace "$SP_CONFIG" "port:
10000||port: ${SP_PORT}"
+
+# 2). flink
+# shellcheck disable=SC2009
+FLINK_PROCESS="$(ps -ef | grep "flink-dist-" | grep
'org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint')"
+if [[ -n "${FLINK_PROCESS}" ]]; then
+ FLINK_PARAM=$($_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --read_flink
"$FLINK_PROCESS")
+ IFS=',' read -r -a ARRAY <<< "$FLINK_PARAM"
+ FLINK_HOME=${ARRAY[0]}
+ FLINK_NAME=${ARRAY[1]}
+ FLINK_PORT=${ARRAY[2]}
+else
+ FLINK_NAME="flink-1.19.0"
+
FLINK_URL="https://archive.apache.org/dist/flink/${FLINK_NAME}/${FLINK_NAME}-bin-scala_2.12.tgz"
+ FLINK_TAR="${FLINK_NAME}-bin-scala_2.12.tgz"
+ FLINK_HOME="${WORK_DIR}"/${SP_NAME}/flink/${FLINK_NAME}
+ FLINK_PATH="${WORK_DIR}"/"${FLINK_TAR}"
+ FLINK_CONF="${FLINK_HOME}/conf/config.yaml"
+
+ # 1) download flink
+ echo_g "download flink..."
+ download "$FLINK_URL" "$FLINK_TAR" "$FLINK_PATH"
+ tar -xvf "${FLINK_TAR}" >/dev/null 2>&1 \
+ && rm -r "${FLINK_TAR}" \
+ && mv "$FLINK_NAME" "${WORK_DIR}"/"${SP_NAME}"/flink
+
+ # 2) start flink-cluster
+ FLINK_PORT=$($_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --free_port "8081")
+ $_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --replace "$SP_CONFIG" "# port:
8081||port: ${FLINK_PORT}"
+
+ bash +x "${FLINK_HOME}"/bin/start-cluster.sh
+fi
+
+# 3) start streampark
+bash +x "${SP_HOME}"/bin/startup.sh \
+ --quickstart flink_home="$FLINK_HOME" \
+ --quickstart flink_port="$FLINK_PORT" \
+ --quickstart flink_name="quickstart-$FLINK_NAME"