This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch quickstart
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit 6486e343c90b480dbdf3166a55bbe3ff63089139
Author: benjobs <[email protected]>
AuthorDate: Tue Jun 11 17:51:39 2024 +0800

    [Feat] quick install script support.
---
 .../conf/config.yaml                               |  88 ++++++++
 .../streampark/common/util/PropertiesUtils.scala   |  82 +++++--
 .../src/main/assembly/bin/streampark.sh            |  56 ++---
 .../console/base/util/BashJavaUtils.java           |  75 ++++++-
 .../ApplicationBuildPipelineController.java        |  52 +----
 .../core/controller/FlinkClusterController.java    |   5 +-
 .../console/core/entity/Application.java           |   2 +-
 .../console/core/runner/QuickStartRunner.java      |  97 ++++++++
 .../console/core/service/ApplicationService.java   |   5 +-
 .../console/core/service/FlinkClusterService.java  |   2 +-
 .../core/service/impl/ApplicationServiceImpl.java  |  50 +++++
 .../core/service/impl/FlinkClusterServiceImpl.java |   4 +-
 .../console/system/runner/StartedUpRunner.java     |  19 +-
 streampark.sh                                      | 248 +++++++++++++++++++++
 14 files changed, 671 insertions(+), 114 deletions(-)

diff --git a/apache-streampark_2.12-2.1.4-incubating-bin/conf/config.yaml 
b/apache-streampark_2.12-2.1.4-incubating-bin/conf/config.yaml
new file mode 100644
index 000000000..c6845eaad
--- /dev/null
+++ b/apache-streampark_2.12-2.1.4-incubating-bin/conf/config.yaml
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+logging:
+  level:
+    root: info
+
+server:
+  port: 10000
+  session:
+    # The user's login session has a validity period. If it exceeds this time, 
the user will be automatically logout
+    # unit: s|m|h|d, s: second, m:minute, h:hour, d: day
+    ttl: 2h # unit[s|m|h|d], e.g: 24h, 2d....
+  undertow: # see: 
https://github.com/undertow-io/undertow/blob/master/core/src/main/java/io/undertow/Undertow.java
+    buffer-size: 1024
+    direct-buffers: true
+    threads:
+      io: 16
+      worker: 256
+
+# system database, default h2, mysql|pgsql|h2
+datasource:
+  dialect: h2  #h2, mysql, pgsql
+  # if datasource.dialect is mysql or pgsql, you need to configure the 
following connection information
+  # mysql/postgresql connect user
+  username:
+  # mysql/postgresql connect password
+  password:
+  # mysql/postgresql connect jdbcURL
+  # mysql example: datasource.url: 
jdbc:mysql://localhost:3306/streampark?useUnicode=true&characterEncoding=UTF-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8
+  # postgresql example: 
jdbc:postgresql://localhost:5432/streampark?stringtype=unspecified
+  url:
+
+streampark:
+  workspace:
+    # Local workspace, storage directory of clone projects and compiled 
projects,Do not set under $APP_HOME. Set it to a directory outside of $APP_HOME.
+    local: /tmp/streampark
+    # The root hdfs path of the jars, Same as yarn.provided.lib.dirs for flink 
on yarn-application and Same as --jars for spark on yarn
+    remote: hdfs:///streampark/
+  proxy:
+    # lark proxy address, default https://open.feishu.cn
+    lark-url:
+    # hadoop yarn proxy path, e.g: knox process address 
https://streampark.com:8443/proxy/yarn
+    yarn-url:
+  yarn:
+    # flink on yarn or spark on yarn, monitoring job status from yarn, it is 
necessary to set hadoop.http.authentication.type
+    http-auth: 'simple'  # default simple, or kerberos
+  # flink on yarn or spark on yarn, HADOOP_USER_NAME
+  hadoop-user-name: hdfs
+  project:
+    # Number of projects allowed to be running at the same time , If there is 
no limit, -1 can be configured
+    max-build: 16
+
+# flink on yarn or spark on yarn, when the hadoop cluster enable kerberos 
authentication, it is necessary to set Kerberos authentication parameters.
+security:
+  kerberos:
+    login:
+      debug: false
+      enable: false
+      keytab:
+      krb5:
+      principal:
+    ttl: 2h # unit [s|m|h|d]
+
+# sign streampark with ldap.
+ldap:
+  base-dn: dc=streampark,dc=com  # Login Account
+  enable: false  # ldap enabled'
+  username: cn=Manager,dc=streampark,dc=com
+  password: streampark
+  urls: ldap://99.99.99.99:389 #AD server IP, default port 389
+  user:
+    email-attribute: mail
+    identity-attribute: uid
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index 03aa13145..6adc03cf7 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -31,7 +31,6 @@ import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
-import scala.util.Try
 
 object PropertiesUtils extends Logger {
 
@@ -308,39 +307,76 @@ object PropertiesUtils extends Logger {
   @Nonnull def extractArguments(args: String): List[String] = {
     val programArgs = new ArrayBuffer[String]()
     if (StringUtils.isNotEmpty(args)) {
-      val array = args.split("\\s+")
-      val iter = array.iterator
-      while (iter.hasNext) {
-        val v = iter.next()
-        val p = v.take(1)
-        p match {
-          case "'" | "\"" =>
-            var value = v
-            if (!v.endsWith(p)) {
-              while (!value.endsWith(p) && iter.hasNext) {
-                value += s" ${iter.next()}"
-              }
+      return extractArguments(args.split("\\s+"))
+    }
+    programArgs.toList
+  }
+
+  def extractArguments(array: Array[String]): List[String] = {
+    val programArgs = new ArrayBuffer[String]()
+    val iter = array.iterator
+    while (iter.hasNext) {
+      val v = iter.next()
+      val p = v.take(1)
+      p match {
+        case "'" | "\"" =>
+          var value = v
+          if (!v.endsWith(p)) {
+            while (!value.endsWith(p) && iter.hasNext) {
+              value += s" ${iter.next()}"
             }
-            programArgs += value.substring(1, value.length - 1)
-          case _ =>
-            val regexp = "(.*)='(.*)'$"
+          }
+          programArgs += value.substring(1, value.length - 1)
+        case _ =>
+          val regexp = "(.*)='(.*)'$"
+          if (v.matches(regexp)) {
+            programArgs += v.replaceAll(regexp, "$1=$2")
+          } else {
+            val regexp = "(.*)=\"(.*)\"$"
             if (v.matches(regexp)) {
               programArgs += v.replaceAll(regexp, "$1=$2")
             } else {
-              val regexp = "(.*)=\"(.*)\"$"
-              if (v.matches(regexp)) {
-                programArgs += v.replaceAll(regexp, "$1=$2")
-              } else {
-                programArgs += v
-              }
+              programArgs += v
             }
-        }
+          }
       }
     }
     programArgs.toList
   }
 
+  def extractMultipleArguments(array: Array[String]): Map[String, Map[String, 
String]] = {
+    val iter = array.iterator
+    val map = mutable.Map[String, mutable.Map[String, String]]()
+    while (iter.hasNext) {
+      val v = iter.next()
+      v.take(2) match {
+        case "--" =>
+          val kv = iter.next()
+          val regexp = "(.*)=(.*)"
+          if (kv.matches(regexp)) {
+            val values = kv.split("=")
+            val k1 = values(0).trim
+            val v1 = values(1).replaceAll("^['|\"]|['|\"]$", "")
+            val k = v.drop(2)
+            map.get(k) match {
+              case Some(m) => m += k1 -> v1
+              case _ => map += k -> mutable.Map(k1 -> v1)
+            }
+          }
+        case _ =>
+      }
+    }
+    map.map(x => x._1 -> x._2.toMap).toMap
+  }
+
   @Nonnull def extractDynamicPropertiesAsJava(properties: String): 
JavaMap[String, String] =
     new JavaMap[String, String](extractDynamicProperties(properties).asJava)
 
+  @Nonnull def extractMultipleArgumentsAsJava(
+      args: Array[String]): JavaMap[String, JavaMap[String, String]] = {
+    val map =
+      extractMultipleArguments(args).map(c => c._1 -> new JavaMap[String, 
String](c._2.asJava))
+    new JavaMap[String, JavaMap[String, String]](map.asJava)
+  }
+
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
 
b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
index 81af07693..5e0e550ce 100755
--- 
a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
@@ -241,10 +241,23 @@ if [[ "$USE_NOHUP" = "true" ]]; then
   NOHUP="nohup"
 fi
 
-BASH_UTIL="org.apache.streampark.console.base.util.BashJavaUtils"
+CONFIG="${APP_CONF}/application.yml"
+# shellcheck disable=SC2006
+if [[ -f "$CONFIG" ]] ; then
+  echo_y """[WARN] in the \"conf\" directory, found the \"application.yml\" 
file. The \"application.yml\" file is deprecated.
+     For compatibility, this application.yml will be used preferentially. The 
latest configuration file is \"config.yaml\". It is recommended to use 
\"config.yaml\".
+     Note: \"application.yml\" will be completely deprecated in version 2.2.0. 
"""
+else
+  CONFIG="${APP_CONF}/config.yaml"
+  if [[ ! -f "$CONFIG" ]] ; then
+    echo_r "can not found config.yaml in \"conf\" directory, please check."
+    exit 1;
+  fi
+fi
 
+BASH_UTIL="org.apache.streampark.console.base.util.BashJavaUtils"
 APP_MAIN="org.apache.streampark.console.StreamParkConsoleBootstrap"
-
+SERVER_PORT=$($_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --get_yaml "server.port" 
"$CONFIG")
 JVM_OPTS_FILE=${APP_HOME}/bin/jvm_opts.sh
 
 JVM_ARGS=""
@@ -276,21 +289,8 @@ print_logo() {
   printf '      %s   WebSite:  https://streampark.apache.org%s\n'              
                    $BLUE   $RESET
   printf '      %s   GitHub :  http://github.com/apache/streampark%s\n\n'      
                    $BLUE   $RESET
   printf '      %s   ──────── Apache StreamPark, Make stream processing easier 
ô~ô!%s\n\n'         $PRIMARY  $RESET
-}
-
-init_env() {
-  # shellcheck disable=SC2006
-  CONFIG="${APP_CONF}/application.yml"
-  if [[ -f "$CONFIG" ]] ; then
-    echo_y """[WARN] in the \"conf\" directory, found the \"application.yml\" 
file. The \"application.yml\" file is deprecated.
-       For compatibility, this application.yml will be used preferentially. 
The latest configuration file is \"config.yaml\". It is recommended to use 
\"config.yaml\".
-       Note: \"application.yml\" will be completely deprecated in version 
2.2.0. """
-  else
-    CONFIG="${APP_CONF}/config.yaml"
-    if [[ ! -f "$CONFIG" ]] ; then
-      echo_r "can not found config.yaml in \"conf\" directory, please check."
-      exit 1;
-    fi
+  if [[ "$1"x == "start"x ]]; then
+    printf '      %s                   http://localhost:%s %s\n\n'             
                    $PRIMARY $SERVER_PORT   $RESET
   fi
 }
 
@@ -313,19 +313,19 @@ get_pid() {
   fi
 
   # shellcheck disable=SC2006
-  local serverPort=`$_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --get_yaml 
"server.port" "$CONFIG"`
-  if [[ x"${serverPort}" == x"" ]]; then
+  if [[ "${SERVER_PORT}"x == ""x ]]; then
     echo_r "server.port is required, please check $CONFIG"
     exit 1;
   else
      # shellcheck disable=SC2006
       # shellcheck disable=SC2155
-      local used=`$_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --check_port 
"$serverPort"`
-      if [[ x"${used}" == x"used" ]]; then
+      local used=`$_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --check_port 
"$SERVER_PORT"`
+      if [[ "${used}"x == "used"x ]]; then
         # shellcheck disable=SC2006
         local PID=`jps -l | grep "$APP_MAIN" | awk '{print $1}'`
+        # shellcheck disable=SC2236
         if [[ ! -z $PID ]]; then
-          echo $PID
+          echo "$PID"
         else
           echo 0
         fi
@@ -411,7 +411,7 @@ start() {
     -Dapp.home="${APP_HOME}" \
     -Dlogging.config="${APP_CONF}/logback-spring.xml" \
     -Djava.io.tmpdir="$APP_TMPDIR" \
-    $APP_MAIN >> "$APP_OUT" 2>&1 "&"
+    $APP_MAIN "$@" >> "$APP_OUT" 2>&1 "&"
 
     local PID=$!
     local IS_NUMBER="^[0-9]+$"
@@ -565,27 +565,31 @@ restart() {
 }
 
 main() {
-  print_logo
-  init_env
   case "$1" in
     "debug")
         DEBUG_PORT=$2
         debug
         ;;
     "start")
-        start
+        shift
+        start "$@"
+        [[ $? -eq 0 ]] && print_logo "start"
         ;;
     "start_docker")
+        print_logo
         start_docker
         ;;
     "stop")
+        print_logo
         stop
         ;;
     "status")
+        print_logo
         status
         ;;
     "restart")
         restart
+        [[ $? -eq 0 ]] && print_logo "start"
         ;;
     *)
         echo_r "Unknown command: $1"
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java
index faf8b8ead..53492b436 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java
@@ -17,15 +17,24 @@
 
 package org.apache.streampark.console.base.util;
 
+import org.apache.streampark.common.conf.FlinkVersion;
+import org.apache.streampark.common.util.FileUtils;
 import org.apache.streampark.common.util.PropertiesUtils;
 
+import org.apache.commons.io.output.NullOutputStream;
+
+import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
-import java.net.ServerSocket;
+import java.io.PrintStream;
+import java.net.Socket;
 import java.util.Arrays;
 import java.util.Map;
 
 public class BashJavaUtils {
 
+  private static String localhost = "localhost";
+
   public static void main(String[] args) throws IOException {
     String action = args[0].toLowerCase();
     String[] actionArgs = Arrays.copyOfRange(args, 1, args.length);
@@ -39,12 +48,66 @@ public class BashJavaUtils {
         System.out.println(value);
         break;
       case "--check_port":
-        Integer port = Integer.parseInt(actionArgs[0]);
-        try {
-          new ServerSocket(port);
-          System.out.println("free");
-        } catch (Exception e) {
+        int port = Integer.parseInt(actionArgs[0]);
+        try (Socket ignored = new Socket(localhost, port)) {
           System.out.println("used");
+        } catch (Exception e) {
+          System.out.println("free");
+        }
+        break;
+      case "--free_port":
+        int start = Integer.parseInt(actionArgs[0]);
+        for (port = start; port < 65535; port++) {
+          try (Socket ignored = new Socket(localhost, port)) {
+          } catch (Exception e) {
+            System.out.println(port);
+            break;
+          }
+        }
+        break;
+      case "--read_flink":
+        String input = actionArgs[0];
+        String[] inputs = input.split(":");
+        String flinkDist =
+            Arrays.stream(inputs).filter(c -> 
c.contains("flink-dist-")).findFirst().get();
+        File flinkHome = new File(flinkDist.replaceAll("/lib/.*", ""));
+        FlinkVersion flinkVersion = new 
FlinkVersion(flinkHome.getAbsolutePath());
+
+        PrintStream originalOut = System.out;
+        System.setOut(new PrintStream(new NullOutputStream()));
+
+        String version = flinkVersion.majorVersion();
+        float ver = Float.parseFloat(version);
+        File yaml =
+            new File(flinkHome, ver < 1.19f ? "/conf/flink-conf.yaml" : 
"/conf/config.yaml");
+
+        Map<String, String> config = 
PropertiesUtils.fromYamlFileAsJava(yaml.getAbsolutePath());
+        String flinkPort = config.getOrDefault("rest.port", "8081");
+        System.setOut(originalOut);
+        System.out.println(
+            flinkHome
+                .getAbsolutePath()
+                .concat(",")
+                .concat(flinkHome.getName())
+                .concat(",")
+                .concat(flinkPort));
+        break;
+      case "--replace":
+        String filePath = actionArgs[0];
+        String[] text = actionArgs[1].split("\\|\\|");
+        String searchText = text[0];
+        String replaceText = text[1];
+        try {
+          File file = new File(filePath);
+          String content = FileUtils.readString(file);
+          content = content.replace(searchText, replaceText);
+          FileWriter writer = new FileWriter(filePath);
+          writer.write(content);
+          writer.flush();
+          writer.close();
+          System.exit(0);
+        } catch (IOException e) {
+          System.exit(1);
         }
         break;
       default:
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
index b86a63cfb..5b2361baa 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
@@ -18,13 +18,9 @@
 package org.apache.streampark.console.core.controller;
 
 import org.apache.streampark.console.base.domain.RestResponse;
-import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.core.annotation.PermissionScope;
 import org.apache.streampark.console.core.bean.AppBuildDockerResolvedDetail;
 import org.apache.streampark.console.core.entity.AppBuildPipeline;
-import org.apache.streampark.console.core.entity.Application;
-import org.apache.streampark.console.core.entity.ApplicationLog;
-import org.apache.streampark.console.core.entity.FlinkEnv;
 import org.apache.streampark.console.core.service.AppBuildPipeService;
 import org.apache.streampark.console.core.service.ApplicationLogService;
 import org.apache.streampark.console.core.service.ApplicationService;
@@ -42,7 +38,6 @@ import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 
-import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -74,52 +69,7 @@ public class ApplicationBuildPipelineController {
   @PostMapping(value = "build")
   @RequiresPermissions("app:create")
   public RestResponse buildApplication(Long appId, boolean forceBuild) throws 
Exception {
-    Application app = applicationService.getById(appId);
-
-    ApiAlertException.throwIfNull(
-        app.getVersionId(), "Please bind a Flink version to the current flink 
job.");
-    // 1) check flink version
-    FlinkEnv env = flinkEnvService.getById(app.getVersionId());
-    boolean checkVersion = env.getFlinkVersion().checkVersion(false);
-    if (!checkVersion) {
-      throw new ApiAlertException("Unsupported flink version: " + 
env.getFlinkVersion().version());
-    }
-
-    // 2) check env
-    boolean envOk = applicationService.checkEnv(app);
-    if (!envOk) {
-      throw new ApiAlertException(
-          "Check flink env failed, please check the flink version of this 
job");
-    }
-
-    if (!forceBuild && !appBuildPipeService.allowToBuildNow(appId)) {
-      throw new ApiAlertException(
-          "The job is invalid, or the job cannot be built while it is 
running");
-    }
-    // check if you need to go through the build process (if the jar and pom 
have changed,
-    // you need to go through the build process, if other common parameters 
are modified,
-    // you don't need to go through the build process)
-
-    ApplicationLog applicationLog = new ApplicationLog();
-    applicationLog.setOptionName(
-        org.apache.streampark.console.core.enums.Operation.RELEASE.getValue());
-    applicationLog.setAppId(app.getId());
-    applicationLog.setOptionTime(new Date());
-
-    boolean needBuild = applicationService.checkBuildAndUpdate(app);
-    if (!needBuild) {
-      applicationLog.setSuccess(true);
-      applicationLogService.save(applicationLog);
-      return RestResponse.success(true);
-    }
-
-    // rollback
-    if (app.isNeedRollback() && app.isFlinkSqlJob()) {
-      flinkSqlService.rollback(app);
-    }
-
-    boolean actionResult = appBuildPipeService.buildApplication(app, 
applicationLog);
-    return RestResponse.success(actionResult);
+    return applicationService.buildApplication(appId, forceBuild);
   }
 
   /**
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
index 04c4386ba..99b92b6ae 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
@@ -22,6 +22,7 @@ import 
org.apache.streampark.console.base.exception.InternalException;
 import org.apache.streampark.console.core.bean.ResponseResult;
 import org.apache.streampark.console.core.entity.FlinkCluster;
 import org.apache.streampark.console.core.service.FlinkClusterService;
+import org.apache.streampark.console.core.service.ServiceHelper;
 
 import org.apache.shiro.authz.annotation.RequiresPermissions;
 
@@ -42,6 +43,8 @@ public class FlinkClusterController {
 
   @Autowired private FlinkClusterService flinkClusterService;
 
+  @Autowired private ServiceHelper serviceHelper;
+
   @PostMapping("list")
   public RestResponse list() {
     List<FlinkCluster> flinkClusters = flinkClusterService.listCluster();
@@ -63,7 +66,7 @@ public class FlinkClusterController {
   @PostMapping("create")
   @RequiresPermissions("cluster:create")
   public RestResponse create(FlinkCluster cluster) {
-    Boolean success = flinkClusterService.create(cluster);
+    Boolean success = flinkClusterService.create(cluster, 
serviceHelper.getUserId());
     return RestResponse.success(success);
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 88db35fa1..f43058d9e 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -456,7 +456,7 @@ public class Application implements Serializable {
   @SuppressWarnings("unchecked")
   public Map<String, Object> getOptionMap() {
     if (StringUtils.isBlank(this.options)) {
-      return Collections.emptyMap();
+      return new HashMap<>();
     }
     Map<String, Object> map = JacksonUtils.read(this.options, Map.class);
     map.entrySet().removeIf(entry -> entry.getValue() == null);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java
new file mode 100644
index 000000000..844da1807
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.runner;
+
+import org.apache.streampark.common.enums.ClusterState;
+import org.apache.streampark.common.enums.ExecutionMode;
+import org.apache.streampark.common.util.PropertiesUtils;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.console.core.entity.FlinkEnv;
+import org.apache.streampark.console.core.entity.FlinkSql;
+import org.apache.streampark.console.core.service.ApplicationService;
+import org.apache.streampark.console.core.service.FlinkClusterService;
+import org.apache.streampark.console.core.service.FlinkEnvService;
+import org.apache.streampark.console.core.service.FlinkSqlService;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Order
+@Slf4j
+@Component
+public class QuickStartRunner implements ApplicationRunner {
+
+  @Autowired private FlinkEnvService flinkEnvService;
+
+  @Autowired private ApplicationService applicationService;
+
+  @Autowired private FlinkClusterService flinkClusterService;
+
+  @Autowired private FlinkSqlService flinkSqlService;
+
+  private static Long demoAppId = 100000L;
+
+  @Override
+  public void run(ApplicationArguments args) throws Exception {
+    Map<String, HashMap<String, String>> map =
+        PropertiesUtils.extractMultipleArgumentsAsJava(args.getSourceArgs());
+    Map<String, String> quickstart = map.get("quickstart");
+
+    if (!quickstart.isEmpty() && quickstart.size() == 3) {
+      // 1) create flinkEnv
+      FlinkEnv flinkEnv = new FlinkEnv();
+      flinkEnv.setFlinkName(quickstart.get("flink_name"));
+      flinkEnv.setFlinkHome(quickstart.get("flink_home"));
+      flinkEnvService.create(flinkEnv);
+
+      // 2) create flinkCluster
+      FlinkCluster flinkCluster = new FlinkCluster();
+      flinkCluster.setClusterName("quickstart");
+      flinkCluster.setVersionId(flinkEnv.getId());
+      flinkCluster.setClusterState(ClusterState.STARTED.getValue());
+      flinkCluster.setExecutionMode(ExecutionMode.REMOTE.getMode());
+      flinkCluster.setAddress("http://localhost:"; + 
quickstart.get("flink_port"));
+      flinkClusterService.create(flinkCluster, 100000L);
+
+      // 3) set flink version and cluster
+      Application app = new Application();
+      app.setId(demoAppId);
+      Application application = applicationService.getApp(app);
+      application.setFlinkClusterId(flinkCluster.getId());
+      application.setVersionId(flinkEnv.getId());
+      application.setExecutionMode(ExecutionMode.REMOTE.getMode());
+
+      FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), 
true);
+      application.setFlinkSql(flinkSql.getSql());
+
+      boolean success = applicationService.update(application);
+      if (success) {
+        // 4) build application
+        applicationService.buildApplication(demoAppId, false);
+      }
+    }
+  }
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
index b9ce6d768..4d37ce836 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
@@ -19,6 +19,7 @@ package org.apache.streampark.console.core.service;
 
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.console.base.domain.RestRequest;
+import org.apache.streampark.console.base.domain.RestResponse;
 import org.apache.streampark.console.base.exception.ApplicationException;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.enums.AppExistsState;
@@ -67,7 +68,7 @@ public interface ApplicationService extends 
IService<Application> {
 
   String readConf(String config) throws IOException;
 
-  Application getApp(Application app);
+  Application getApp(Application application);
 
   String getMain(Application application);
 
@@ -128,4 +129,6 @@ public interface ApplicationService extends 
IService<Application> {
   AppExistsState checkStart(Application app);
 
   List<ApplicationReport> getYARNApplication(String appName);
+
+  RestResponse buildApplication(Long appId, boolean forceBuild) throws 
Exception;
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
index d82387d04..a96cd4e16 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
@@ -30,7 +30,7 @@ public interface FlinkClusterService extends 
IService<FlinkCluster> {
 
   ResponseResult check(FlinkCluster flinkCluster);
 
-  Boolean create(FlinkCluster flinkCluster);
+  Boolean create(FlinkCluster flinkCluster, Long userId);
 
   void delete(Long id);
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index dcdae2437..6e4f6e7ad 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -34,6 +34,7 @@ import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.common.util.YarnUtils;
 import org.apache.streampark.console.base.domain.RestRequest;
+import org.apache.streampark.console.base.domain.RestResponse;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 import org.apache.streampark.console.base.exception.ApiDetailException;
 import org.apache.streampark.console.base.exception.ApplicationException;
@@ -1948,6 +1949,55 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     }
   }
 
+  @Override
+  public RestResponse buildApplication(Long appId, boolean forceBuild) throws 
Exception {
+    Application app = this.getById(appId);
+
+    ApiAlertException.throwIfNull(
+        app.getVersionId(), "Please bind a Flink version to the current flink 
job.");
+    // 1) check flink version
+    FlinkEnv env = flinkEnvService.getById(app.getVersionId());
+    boolean checkVersion = env.getFlinkVersion().checkVersion(false);
+    if (!checkVersion) {
+      throw new ApiAlertException("Unsupported flink version: " + 
env.getFlinkVersion().version());
+    }
+
+    // 2) check env
+    boolean envOk = this.checkEnv(app);
+    if (!envOk) {
+      throw new ApiAlertException(
+          "Check flink env failed, please check the flink version of this 
job");
+    }
+
+    if (!forceBuild && !appBuildPipeService.allowToBuildNow(appId)) {
+      throw new ApiAlertException(
+          "The job is invalid, or the job cannot be built while it is 
running");
+    }
+    // check if you need to go through the build process (if the jar and pom 
have changed,
+    // you need to go through the build process, if other common parameters 
are modified,
+    // you don't need to go through the build process)
+
+    ApplicationLog applicationLog = new ApplicationLog();
+    applicationLog.setOptionName(
+        org.apache.streampark.console.core.enums.Operation.RELEASE.getValue());
+    applicationLog.setAppId(app.getId());
+    applicationLog.setOptionTime(new Date());
+
+    boolean needBuild = this.checkBuildAndUpdate(app);
+    if (!needBuild) {
+      applicationLog.setSuccess(true);
+      applicationLogService.save(applicationLog);
+      return RestResponse.success(true);
+    }
+
+    // rollback
+    if (app.isNeedRollback() && app.isFlinkSqlJob()) {
+      flinkSqlService.rollback(app);
+    }
+    boolean actionResult = appBuildPipeService.buildApplication(app, 
applicationLog);
+    return RestResponse.success(actionResult);
+  }
+
   private Tuple2<String, String> getNamespaceClusterId(Application 
application) {
     String clusterId = null;
     String k8sNamespace = null;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index efc2d295a..1cc9a82f4 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -139,8 +139,8 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
   }
 
   @Override
-  public Boolean create(FlinkCluster flinkCluster) {
-    flinkCluster.setUserId(serviceHelper.getUserId());
+  public Boolean create(FlinkCluster flinkCluster, Long userId) {
+    flinkCluster.setUserId(userId);
     boolean successful = validateQueueIfNeeded(flinkCluster);
     ApiAlertException.throwIfFalse(
         successful, String.format(ERROR_CLUSTER_QUEUE_HINT, 
flinkCluster.getYarnQueue()));
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java
index 4a19b709f..93c0c5490 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.console.system.runner;
 
-import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.util.SystemPropertyUtils;
 
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -27,6 +27,8 @@ import 
org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.core.annotation.Order;
 import org.springframework.stereotype.Component;
 
+import java.time.LocalDateTime;
+
 @Order
 @Slf4j
 @Component
@@ -37,7 +39,20 @@ public class StartedUpRunner implements ApplicationRunner {
   @Override
   public void run(ApplicationArguments args) {
     if (context.isActive()) {
-      ConfigConst.printLogo("streampark-console start successful");
+      String port = SystemPropertyUtils.get("server.port", "10000");
+      System.out.println("\n");
+      System.out.println("        _____ __                                     
        __       ");
+      System.out.println("       / ___// /_________  ____ _____ ___  ____  
____ ______/ /__     ");
+      System.out.println("       \\__ \\/ __/ ___/ _ \\/ __ `/ __ `__ \\/ __ 
\\  __ `/ ___/ //_/");
+      System.out.println("      ___/ / /_/ /  /  __/ /_/ / / / / / / /_/ / /_/ 
/ /  / ,<        ");
+      System.out.println("     /____/\\__/_/   \\___/\\__,_/_/ /_/ /_/ 
____/\\__,_/_/  /_/|_|   ");
+      System.out.println("                                       /_/           
             \n\n");
+      System.out.println("    Version:  2.1.5                                  
                 ");
+      System.out.println("    WebSite:  https://streampark.apache.org          
                 ");
+      System.out.println("    GitHub :  
https://github.com/apache/incubator-streampark          ");
+      System.out.println("    Info   :  streampark-console start successful    
                 ");
+      System.out.println("    Local  :  http://localhost:"; + port);
+      System.out.println("    Time   :  " + LocalDateTime.now() + "\n\n");
     }
   }
 }
diff --git a/streampark.sh b/streampark.sh
new file mode 100755
index 000000000..0b7e7d312
--- /dev/null
+++ b/streampark.sh
@@ -0,0 +1,248 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# shellcheck disable=SC2317
+
+# Bugzilla 37848: When no TTY is available, don't output to console
+have_tty=0
+# shellcheck disable=SC2006
+if [[ "`tty`" != "not a tty" ]]; then
+    have_tty=1
+fi
+
+# Bugzilla 37848: When no TTY is available, don't output to console
+have_tty=0
+# shellcheck disable=SC2006
+if [[ "`tty`" != "not a tty" ]]; then
+    have_tty=1
+fi
+
+ # Only use colors if connected to a terminal
+if [[ ${have_tty} -eq 1 ]]; then
+  RED=$(printf '\033[31m')
+  GREEN=$(printf '\033[32m')
+  BLUE=$(printf '\033[34m')
+  RESET=$(printf '\033[0m')
+else
+  RED=""
+  GREEN=""
+  BLUE=""
+  RESET=""
+fi
+
+echo_r () {
+    # Color red: Error, Failed
+    [[ $# -ne 1 ]] && return 1
+    # shellcheck disable=SC2059
+    printf "[%sStreamPark%s] %s$1%s\n"  "$BLUE" "$RESET" "$RED" "$RESET"
+}
+
+echo_g () {
+    # Color green: Success
+    [[ $# -ne 1 ]] && return 1
+    # shellcheck disable=SC2059
+    printf "[%sStreamPark%s] %s$1%s\n"  "$BLUE" "$RESET" "$GREEN" "$RESET"
+}
+
+# OS specific support.  $var _must_ be set to either true or false.
+cygwin=false;
+darwin=false;
+mingw=false
+case "$(uname)" in
+  CYGWIN*) cygwin=true ;;
+  MINGW*) mingw=true;;
+  Darwin*) darwin=true
+    # Use /usr/libexec/java_home if available, otherwise fall back to 
/Library/Java/Home
+    # See https://developer.apple.com/library/mac/qa/qa1170/_index.html
+    if [ -z "$JAVA_HOME" ]; then
+      if [ -x "/usr/libexec/java_home" ]; then
+        JAVA_HOME="$(/usr/libexec/java_home)"; export JAVA_HOME
+      else
+        JAVA_HOME="/Library/Java/Home"; export JAVA_HOME
+      fi
+    fi
+    ;;
+esac
+
+if [ -z "$JAVA_HOME" ] ; then
+  if [ -r /etc/gentoo-release ] ; then
+    JAVA_HOME=$(java-config --jre-home)
+  fi
+fi
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched
+if $cygwin ; then
+  [ -n "$JAVA_HOME" ] &&
+    JAVA_HOME=$(cygpath --unix "$JAVA_HOME")
+  [ -n "$CLASSPATH" ] &&
+    CLASSPATH=$(cygpath --path --unix "$CLASSPATH")
+fi
+
+# For Mingw, ensure paths are in UNIX format before anything is touched
+if $mingw ; then
+  [ -n "$JAVA_HOME" ] && [ -d "$JAVA_HOME" ] &&
+    JAVA_HOME="$(cd "$JAVA_HOME" || (echo_r "cannot cd into $JAVA_HOME."; exit 
1); pwd)"
+fi
+
+if [ -z "$JAVA_HOME" ]; then
+  javaExecutable="$(which javac)"
+  if [ -n "$javaExecutable" ] && ! [ "$(expr "\"$javaExecutable\"" : '\([^ 
]*\)')" = "no" ]; then
+    # readlink(1) is not available as standard on Solaris 10.
+    readLink=$(which readlink)
+    if [ ! "$(expr "$readLink" : '\([^ ]*\)')" = "no" ]; then
+      if $darwin ; then
+        javaHome="$(dirname "\"$javaExecutable\"")"
+        javaExecutable="$(cd "\"$javaHome\"" && pwd -P)/javac"
+      else
+        javaExecutable="$(readlink -f "\"$javaExecutable\"")"
+      fi
+      javaHome="$(dirname "\"$javaExecutable\"")"
+      javaHome=$(expr "$javaHome" : '\(.*\)/bin')
+      JAVA_HOME="$javaHome"
+      export JAVA_HOME
+    fi
+  fi
+fi
+
+if [ -z "$JAVACMD" ] ; then
+  if [ -n "$JAVA_HOME"  ] ; then
+    if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+      # IBM's JDK on AIX uses strange locations for the executables
+      JAVACMD="$JAVA_HOME/jre/sh/java"
+    else
+      JAVACMD="$JAVA_HOME/bin/java"
+    fi
+  else
+    JAVACMD="$(\unset -f command 2>/dev/null; \command -v java)"
+  fi
+fi
+
+if [ ! -x "$JAVACMD" ] ; then
+  echo_r "Error: JAVA_HOME is not defined correctly." >&2
+  echo_r "  We cannot execute $JAVACMD" >&2
+  exit 1
+fi
+
+if [ -z "$JAVA_HOME" ] ; then
+  echo_r "Warning: JAVA_HOME environment variable is not set."
+fi
+
+_RUNJAVA="$JAVA_HOME/bin/java"
+
+# resolve links - $0 may be a softlink
+PRG="$0"
+
+while [[ -h "$PRG" ]]; do
+  # shellcheck disable=SC2006
+  ls=`ls -ld "$PRG"`
+  # shellcheck disable=SC2006
+  link=`expr "$ls" : '.*-> \(.*\)$'`
+  if expr "$link" : '/.*' > /dev/null; then
+    PRG="$link"
+  else
+    # shellcheck disable=SC2006
+    PRG=`dirname "$PRG"`/"$link"
+  fi
+done
+
+# Get standard environment variables
+# shellcheck disable=SC2006
+PRG_DIR=`dirname "$PRG"`
+WORK_DIR=$(cd "$PRG_DIR" >/dev/null || exit; pwd)
+
+SP_VERSION="2.1.5"
+SP_NAME="apache-streampark_2.12-${SP_VERSION}-incubating-bin"
+SP_TAR="${SP_NAME}.tar.gz"
+SP_URL="https://archive.apache.org/dist/incubator/streampark/${SP_VERSION}/${SP_TAR}";
+SP_HOME="${WORK_DIR}"/"${SP_NAME}"
+SP_CONFIG="${SP_HOME}/conf/config.yaml"
+
+download() {
+  local url=$1
+  local name=$2
+  if command -v wget > /dev/null; then
+     wget "$url" || rm -f "$name"
+     # shellcheck disable=SC2181
+     if [[ $? -ne 0 ]]; then
+        echo_r "download $name failed. url: $url"
+        exit 1
+     fi
+  elif command -v curl > /dev/null; then
+     curl "$url" -f -L || rm -f "$name"
+     # shellcheck disable=SC2181
+     if [[ $? -ne 0 ]]; then
+       echo_r "download $name failed. url: $url"
+       exit 1
+     fi
+  fi
+}
+
+BASH_UTIL="org.apache.streampark.console.base.util.BashJavaUtils"
+
+# 1). download streampark.
+echo_g "download streampark..."
+
+# download "$SP_URL" "$SP_TAR"
+tar -xvf "${SP_TAR}" >/dev/null 2>&1 \
+    && rm -r "${SP_TAR}" \
+    && mkdir "${SP_HOME}"/flink \
+    && mkdir "${SP_HOME}"/workspace
+
+# 1.1) workspace
+$_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --replace "$SP_CONFIG" "local: 
||local: ${SP_HOME}/workspace  #"
+
+# 1.2) port.
+SP_PORT=$($_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --free_port "10000")
+$_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --replace "$SP_CONFIG" "port: 
10000||port: ${SP_PORT}"
+
+# 2). flink
+# shellcheck disable=SC2009
+FLINK_PROCESS="$(ps -ef | grep "flink-dist-" | grep 
'org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint')"
+if [[ -n "${FLINK_PROCESS}" ]]; then
+  FLINK_PARAM=$($_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --read_flink 
"$FLINK_PROCESS")
+  IFS=',' read -r -a ARRAY <<< "$FLINK_PARAM"
+  FLINK_HOME=${ARRAY[0]}
+  FLINK_NAME=${ARRAY[1]}
+  FLINK_PORT=${ARRAY[2]}
+else
+  FLINK_NAME="flink-1.19.0"
+  
FLINK_URL="https://archive.apache.org/dist/flink/${FLINK_NAME}/${FLINK_NAME}-bin-scala_2.12.tgz";
+  FLINK_TAR="${FLINK_NAME}-bin-scala_2.12.tgz"
+  FLINK_HOME="${WORK_DIR}"/${SP_NAME}/flink/${FLINK_NAME}
+  FLINK_CONF="${FLINK_HOME}/conf/config.yaml"
+
+  # 1) download flink
+  echo_g "download flink..."
+  download "$FLINK_URL" "$FLINK_TAR"
+  tar -xvf "${FLINK_TAR}" >/dev/null 2>&1 \
+    && rm -r "${FLINK_TAR}" \
+    && mv "$FLINK_NAME" "${WORK_DIR}"/"${SP_NAME}"/flink
+
+  # 2) start flink-cluster
+  FLINK_PORT=$($_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --free_port "8081")
+  $_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --replace "$SP_CONFIG" "# port: 
8081||port: ${FLINK_PORT}"
+
+  bash +x "${FLINK_HOME}"/bin/start-cluster.sh
+fi
+
+# 3) start streampark
+bash +x "${SP_HOME}"/bin/startup.sh \
+  --quickstart flink_home="$FLINK_HOME" \
+  --quickstart flink_port="$FLINK_PORT" \
+  --quickstart flink_name="quickstart-$FLINK_NAME"


Reply via email to