This is an automated email from the ASF dual-hosted git repository.

cancai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1326b9ff1 [Feat] sync quick-install support from dev-2.1.4 (#3763)
1326b9ff1 is described below

commit 1326b9ff1bab693f8d2d5f58d38ce9e5a779a49c
Author: benjobs <[email protected]>
AuthorDate: Sun Jun 16 20:10:12 2024 +0800

    [Feat] sync quick-install support from dev-2.1.4 (#3763)
    
    Co-authored-by: benjobs <[email protected]>
---
 .../apache/streampark/common/util/FileUtils.scala  |  18 ++
 .../streampark/common/util/PropertiesUtils.scala   |  93 +++++--
 .../src/main/assembly/bin/streampark.sh            |  51 ++--
 .../console/base/util/BashJavaUtils.java           |  90 ++++++-
 .../core/controller/FlinkClusterController.java    |   6 +-
 .../console/core/entity/AppBuildPipeline.java      |   9 +-
 .../console/core/entity/Application.java           |   2 +-
 .../console/core/entity/ApplicationConfig.java     |   6 +-
 .../console/core/entity/FlinkCluster.java          |   5 +-
 .../console/core/entity/SparkApplication.java      |   2 +-
 .../console/core/runner/QuickStartRunner.java      | 101 ++++++++
 .../console/core/service/FlinkClusterService.java  |   2 +-
 .../core/service/impl/AppBuildPipeServiceImpl.java |   6 +-
 .../core/service/impl/FlinkClusterServiceImpl.java |   7 +-
 .../service/impl/SparkAppBuildPipeServiceImpl.java |   6 +-
 .../core/utils/YarnQueueLabelExpression.java       |   3 +-
 .../console/system/runner/StartedUpRunner.java     |  19 +-
 streampark.sh                                      | 282 +++++++++++++++++++++
 18 files changed, 623 insertions(+), 85 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
index e8fe7ef51..7d508b2c1 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
@@ -25,6 +25,7 @@ import java.nio.channels.Channels
 import java.nio.charset.StandardCharsets
 import java.nio.file.{Files, Paths}
 import java.util
+import java.util.Scanner
 import java.util.stream.Collectors
 
 import scala.collection.convert.ImplicitConversions._
@@ -282,4 +283,21 @@ object FileUtils {
     null
   }
 
+  @throws[IOException]
+  def readString(file: File): String = {
+    require(file != null && file.isFile)
+    val reader = new FileReader(file)
+    val scanner = new Scanner(reader)
+    val buffer = new mutable.StringBuilder()
+    if (scanner.hasNextLine) {
+      buffer.append(scanner.nextLine())
+    }
+    while (scanner.hasNextLine) {
+      buffer.append("\r\n")
+      buffer.append(scanner.nextLine())
+    }
+    Utils.close(scanner, reader)
+    buffer.toString()
+  }
+
 }
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index e3a0ae547..d9a00a6d9 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -70,13 +70,12 @@ object PropertiesUtils extends Logger {
             })
           .toMap
       case text =>
-        val value = text match {
-          case null => ""
-          case other => other.toString
-        }
-        prefix match {
-          case "" => proper += k -> value
-          case other => proper += s"$other.$k" -> value
+        if (text != null) {
+          val value = text.toString.trim
+          prefix match {
+            case "" => proper += k -> value
+            case other => proper += s"$other.$k" -> value
+          }
         }
         proper.toMap
     }
@@ -276,7 +275,7 @@ object PropertiesUtils extends Logger {
 
   /** extract flink configuration from application.properties */
   @Nonnull def extractDynamicProperties(properties: String): Map[String, 
String] = {
-    if (StringUtils.isBlank(properties)) Map.empty[String, String]
+    if (StringUtils.isEmpty(properties)) Map.empty[String, String]
     else {
       val map = mutable.Map[String, String]()
       val simple = properties.replaceAll(MULTI_PROPERTY_REGEXP, "")
@@ -308,28 +307,76 @@ object PropertiesUtils extends Logger {
   @Nonnull def extractArguments(args: String): List[String] = {
     val programArgs = new ArrayBuffer[String]()
     if (StringUtils.isNotEmpty(args)) {
-      val array = args.split("\\s+")
-      val iter = array.iterator
-      while (iter.hasNext) {
-        val v = iter.next()
-        val p = v.take(1)
-        p match {
-          case "'" | "\"" =>
-            var value = v
-            if (!v.endsWith(p)) {
-              while (!value.endsWith(p) && iter.hasNext) {
-                value += s" ${iter.next()}"
-              }
+      return extractArguments(args.split("\\s+"))
+    }
+    programArgs.toList
+  }
+
+  def extractArguments(array: Array[String]): List[String] = {
+    val programArgs = new ArrayBuffer[String]()
+    val iter = array.iterator
+    while (iter.hasNext) {
+      val v = iter.next()
+      val p = v.take(1)
+      p match {
+        case "'" | "\"" =>
+          var value = v
+          if (!v.endsWith(p)) {
+            while (!value.endsWith(p) && iter.hasNext) {
+              value += s" ${iter.next()}"
             }
-            programArgs += value.substring(1, value.length - 1)
-          case _ => programArgs += v
-        }
+          }
+          programArgs += value.substring(1, value.length - 1)
+        case _ =>
+          val regexp = "(.*)='(.*)'$"
+          if (v.matches(regexp)) {
+            programArgs += v.replaceAll(regexp, "$1=$2")
+          } else {
+            val regexp = "(.*)=\"(.*)\"$"
+            if (v.matches(regexp)) {
+              programArgs += v.replaceAll(regexp, "$1=$2")
+            } else {
+              programArgs += v
+            }
+          }
       }
     }
     programArgs.toList
   }
 
+  def extractMultipleArguments(array: Array[String]): Map[String, Map[String, 
String]] = {
+    val iter = array.iterator
+    val map = mutable.Map[String, mutable.Map[String, String]]()
+    while (iter.hasNext) {
+      val v = iter.next()
+      v.take(2) match {
+        case "--" =>
+          val kv = iter.next()
+          val regexp = "(.*)=(.*)"
+          if (kv.matches(regexp)) {
+            val values = kv.split("=")
+            val k1 = values(0).trim
+            val v1 = values(1).replaceAll("^['|\"]|['|\"]$", "")
+            val k = v.drop(2)
+            map.get(k) match {
+              case Some(m) => m += k1 -> v1
+              case _ => map += k -> mutable.Map(k1 -> v1)
+            }
+          }
+        case _ =>
+      }
+    }
+    map.map(x => x._1 -> x._2.toMap).toMap
+  }
+
   @Nonnull def extractDynamicPropertiesAsJava(properties: String): 
JavaMap[String, String] =
     new JavaMap[String, String](extractDynamicProperties(properties).asJava)
 
+  @Nonnull def extractMultipleArgumentsAsJava(
+      args: Array[String]): JavaMap[String, JavaMap[String, String]] = {
+    val map =
+      extractMultipleArguments(args).map(c => c._1 -> new JavaMap[String, 
String](c._2.asJava))
+    new JavaMap[String, JavaMap[String, String]](map.asJava)
+  }
+
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
 
b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
index 7b3e59f87..4df9e5e67 100755
--- 
a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
@@ -132,7 +132,7 @@ APP_BASE="$APP_HOME"
 APP_CONF="$APP_BASE"/conf
 APP_LIB="$APP_BASE"/lib
 APP_LOG="$APP_BASE"/logs
-APP_PID="$APP_BASE"/streampark.pid
+APP_PID="$APP_BASE"/.pid
 APP_OUT="$APP_LOG"/streampark.out
 # shellcheck disable=SC2034
 APP_TMPDIR="$APP_BASE"/temp
@@ -241,10 +241,16 @@ if [[ "$USE_NOHUP" = "true" ]]; then
   NOHUP="nohup"
 fi
 
-BASH_UTIL="org.apache.streampark.console.base.util.BashJavaUtils"
+CONFIG="${APP_CONF}/config.yaml"
+# shellcheck disable=SC2006
+if [[ ! -f "$CONFIG" ]] ; then
+  echo_r "can not found config.yaml in \"conf\" directory, please check."
+  exit 1;
+fi
 
+BASH_UTIL="org.apache.streampark.console.base.util.BashJavaUtils"
 APP_MAIN="org.apache.streampark.console.StreamParkConsoleBootstrap"
-
+SERVER_PORT=$($_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --get_yaml "server.port" 
"$CONFIG")
 JVM_OPTS_FILE=${APP_HOME}/bin/jvm_opts.sh
 
 JVM_ARGS=""
@@ -276,21 +282,8 @@ print_logo() {
   printf '      %s   WebSite:  https://streampark.apache.org%s\n'              
                    $BLUE   $RESET
   printf '      %s   GitHub :  http://github.com/apache/streampark%s\n\n'      
                    $BLUE   $RESET
   printf '      %s   ──────── Apache StreamPark, Make stream processing easier 
ô~ô!%s\n\n'         $PRIMARY  $RESET
-}
-
-init_env() {
-  # shellcheck disable=SC2006
-  CONFIG="${APP_CONF}/application.yml"
-  if [[ -f "$CONFIG" ]] ; then
-    echo_y """[WARN] in the \"conf\" directory, found the \"application.yml\" 
file. The \"application.yml\" file is deprecated.
-       For compatibility, this application.yml will be used preferentially. 
The latest configuration file is \"config.yaml\". It is recommended to use 
\"config.yaml\".
-       Note: \"application.yml\" will be completely deprecated in version 
2.2.0. """
-  else
-    CONFIG="${APP_CONF}/config.yaml"
-    if [[ ! -f "$CONFIG" ]] ; then
-      echo_r "can not found config.yaml in \"conf\" directory, please check."
-      exit 1;
-    fi
+  if [[ "$1"x == "start"x ]]; then
+    printf '      %s                   http://localhost:%s %s\n\n'             
                    $PRIMARY $SERVER_PORT   $RESET
   fi
 }
 
@@ -313,19 +306,19 @@ get_pid() {
   fi
 
   # shellcheck disable=SC2006
-  local serverPort=`$_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --get_yaml 
"server.port" "$CONFIG"`
-  if [[ x"${serverPort}" == x"" ]]; then
+  if [[ "${SERVER_PORT}"x == ""x ]]; then
     echo_r "server.port is required, please check $CONFIG"
     exit 1;
   else
      # shellcheck disable=SC2006
       # shellcheck disable=SC2155
-      local used=`$_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --check_port 
"$serverPort"`
-      if [[ x"${used}" == x"used" ]]; then
+      local used=`$_RUNJAVA -cp "$APP_LIB/*" $BASH_UTIL --check_port 
"$SERVER_PORT"`
+      if [[ "${used}"x == "used"x ]]; then
         # shellcheck disable=SC2006
         local PID=`jps -l | grep "$APP_MAIN" | awk '{print $1}'`
+        # shellcheck disable=SC2236
         if [[ ! -z $PID ]]; then
-          echo $PID
+          echo "$PID"
         else
           echo 0
         fi
@@ -411,7 +404,7 @@ start() {
     -Dapp.home="${APP_HOME}" \
     -Dlogging.config="${APP_CONF}/logback-spring.xml" \
     -Djava.io.tmpdir="$APP_TMPDIR" \
-    $APP_MAIN >> "$APP_OUT" 2>&1 "&"
+    $APP_MAIN "$@" >> "$APP_OUT" 2>&1 "&"
 
     local PID=$!
     local IS_NUMBER="^[0-9]+$"
@@ -565,27 +558,31 @@ restart() {
 }
 
 main() {
-  print_logo
-  init_env
   case "$1" in
     "debug")
         DEBUG_PORT=$2
         debug
         ;;
     "start")
-        start
+        shift
+        start "$@"
+        [[ $? -eq 0 ]] && print_logo "start"
         ;;
     "start_docker")
+        print_logo
         start_docker
         ;;
     "stop")
+        print_logo
         stop
         ;;
     "status")
+        print_logo
         status
         ;;
     "restart")
         restart
+        [[ $? -eq 0 ]] && print_logo "start"
         ;;
     *)
         echo_r "Unknown command: $1"
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java
index faf8b8ead..999167fde 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/BashJavaUtils.java
@@ -17,15 +17,30 @@
 
 package org.apache.streampark.console.base.util;
 
+import org.apache.streampark.common.conf.FlinkVersion;
+import org.apache.streampark.common.util.FileUtils;
 import org.apache.streampark.common.util.PropertiesUtils;
 
+import org.apache.commons.io.output.NullOutputStream;
+
+import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
-import java.net.ServerSocket;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.net.Socket;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
 import java.util.Arrays;
 import java.util.Map;
 
 public class BashJavaUtils {
 
+  private static String localhost = "localhost";
+
   public static void main(String[] args) throws IOException {
     String action = args[0].toLowerCase();
     String[] actionArgs = Arrays.copyOfRange(args, 1, args.length);
@@ -39,12 +54,77 @@ public class BashJavaUtils {
         System.out.println(value);
         break;
       case "--check_port":
-        Integer port = Integer.parseInt(actionArgs[0]);
-        try {
-          new ServerSocket(port);
+        int port = Integer.parseInt(actionArgs[0]);
+        try (Socket ignored = new Socket(localhost, port)) {
+          System.out.println("used");
+        } catch (Exception e) {
           System.out.println("free");
+        }
+        break;
+      case "--free_port":
+        int start = Integer.parseInt(actionArgs[0]);
+        for (port = start; port < 65535; port++) {
+          try (Socket ignored = new Socket(localhost, port)) {
+          } catch (Exception e) {
+            System.out.println(port);
+            break;
+          }
+        }
+        break;
+      case "--read_flink":
+        String input = actionArgs[0];
+        String[] inputs = input.split(":");
+        String flinkDist =
+            Arrays.stream(inputs).filter(c -> 
c.contains("flink-dist-")).findFirst().get();
+        File flinkHome = new File(flinkDist.replaceAll("/lib/.*", ""));
+        FlinkVersion flinkVersion = new 
FlinkVersion(flinkHome.getAbsolutePath());
+
+        PrintStream originalOut = System.out;
+        System.setOut(new PrintStream(new NullOutputStream()));
+
+        String version = flinkVersion.majorVersion();
+        float ver = Float.parseFloat(version);
+        File yaml =
+            new File(flinkHome, ver < 1.19f ? "/conf/flink-conf.yaml" : 
"/conf/config.yaml");
+
+        Map<String, String> config = 
PropertiesUtils.fromYamlFileAsJava(yaml.getAbsolutePath());
+        String flinkPort = config.getOrDefault("rest.port", "8081");
+        System.setOut(originalOut);
+        System.out.println(
+            flinkHome
+                .getAbsolutePath()
+                .concat(",")
+                .concat(flinkHome.getName())
+                .concat(",")
+                .concat(flinkPort));
+        break;
+      case "--replace":
+        String filePath = actionArgs[0];
+        String[] text = actionArgs[1].split("\\|\\|");
+        String searchText = text[0];
+        String replaceText = text[1];
+        try {
+          File file = new File(filePath);
+          String content = FileUtils.readString(file);
+          content = content.replace(searchText, replaceText);
+          FileWriter writer = new FileWriter(filePath);
+          writer.write(content);
+          writer.flush();
+          writer.close();
+          System.exit(0);
+        } catch (IOException e) {
+          System.exit(1);
+        }
+        break;
+      case "--download":
+        try {
+          URL url = new URL(actionArgs[0]);
+          Path path = Paths.get(actionArgs[1]).toAbsolutePath().normalize();
+          try (InputStream inStream = url.openStream()) {
+            Files.copy(inStream, path, StandardCopyOption.REPLACE_EXISTING);
+          }
         } catch (Exception e) {
-          System.out.println("used");
+          System.exit(1);
         }
         break;
       default:
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
index 71b297d67..dd796437d 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkClusterController.java
@@ -23,6 +23,7 @@ import 
org.apache.streampark.console.base.exception.InternalException;
 import org.apache.streampark.console.core.bean.ResponseResult;
 import org.apache.streampark.console.core.entity.FlinkCluster;
 import org.apache.streampark.console.core.service.FlinkClusterService;
+import org.apache.streampark.console.core.service.ServiceHelper;
 
 import org.apache.shiro.authz.annotation.RequiresPermissions;
 
@@ -43,6 +44,8 @@ public class FlinkClusterController {
 
   @Autowired private FlinkClusterService flinkClusterService;
 
+  @Autowired private ServiceHelper serviceHelper;
+
   @PostMapping("availableList")
   public RestResponse listAvailableCluster() {
     List<FlinkCluster> flinkClusters = 
flinkClusterService.listAvailableCluster();
@@ -70,7 +73,8 @@ public class FlinkClusterController {
   @PostMapping("create")
   @RequiresPermissions("cluster:create")
   public RestResponse create(FlinkCluster cluster) {
-    Boolean success = flinkClusterService.create(cluster);
+    Long userId = serviceHelper.getUserId();
+    Boolean success = flinkClusterService.create(cluster, userId);
     return RestResponse.success(success);
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/AppBuildPipeline.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/AppBuildPipeline.java
index 75b5e386c..9314a0714 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/AppBuildPipeline.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/AppBuildPipeline.java
@@ -46,7 +46,6 @@ import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -122,7 +121,7 @@ public class AppBuildPipeline {
   @JsonIgnore
   public Map<Integer, PipelineStepStatusEnum> getStepStatus() {
     if (StringUtils.isBlank(stepStatusJson)) {
-      return Collections.emptyMap();
+      return new HashMap<>();
     }
     try {
       return JacksonUtils.read(
@@ -130,7 +129,7 @@ public class AppBuildPipeline {
     } catch (JsonProcessingException e) {
       log.error(
           "json parse error on ApplicationBuildPipeline, stepStatusJson={}", 
stepStatusJson, e);
-      return Collections.emptyMap();
+      return new HashMap<>();
     }
   }
 
@@ -153,7 +152,7 @@ public class AppBuildPipeline {
   @JsonIgnore
   public Map<Integer, Long> getStepStatusTimestamp() {
     if (StringUtils.isBlank(stepStatusTimestampJson)) {
-      return Collections.emptyMap();
+      return new HashMap<>();
     }
     try {
       return JacksonUtils.read(
@@ -163,7 +162,7 @@ public class AppBuildPipeline {
           "json parse error on ApplicationBuildPipeline, stepStatusJson={}",
           stepStatusTimestampJson,
           e);
-      return Collections.emptyMap();
+      return new HashMap<>();
     }
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 93dacc945..bd6498757 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -438,7 +438,7 @@ public class Application implements Serializable {
   @SuppressWarnings("unchecked")
   public Map<String, Object> getOptionMap() {
     if (StringUtils.isBlank(this.options)) {
-      return Collections.emptyMap();
+      return new HashMap<>();
     }
     Map<String, Object> optionMap = JacksonUtils.read(this.options, Map.class);
     optionMap.entrySet().removeIf(entry -> entry.getValue() == null);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
index 5a096fbde..93634c3d0 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
@@ -34,8 +34,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.jetbrains.annotations.Nullable;
 
 import java.util.Base64;
-import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.stream.Collectors;
 
@@ -109,7 +109,7 @@ public class ApplicationConfig {
                   },
                   Map.Entry::getValue));
     }
-    return Collections.emptyMap();
+    return new HashMap<>();
   }
 
   @Nullable
@@ -126,7 +126,7 @@ public class ApplicationConfig {
       case HOCON:
         return 
PropertiesUtils.fromHoconTextAsJava(DeflaterUtils.unzipString(this.content));
       default:
-        return Collections.emptyMap();
+        return new HashMap<>();
     }
   }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index d1ce09e77..0314323b1 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -46,7 +46,6 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.io.Serializable;
 import java.net.URI;
-import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -136,7 +135,7 @@ public class FlinkCluster implements Serializable {
   @SneakyThrows
   public Map<String, Object> getOptionMap() {
     if (StringUtils.isBlank(this.options)) {
-      return Collections.emptyMap();
+      return new HashMap<>();
     }
     Map<String, Object> optionMap = JacksonUtils.read(this.options, Map.class);
     if (FlinkExecutionMode.YARN_SESSION == getFlinkExecutionModeEnum()) {
@@ -167,7 +166,7 @@ public class FlinkCluster implements Serializable {
         HttpClientUtils.httpGetRequest(
             restUrl, RequestConfig.custom().setConnectTimeout(2000, 
TimeUnit.MILLISECONDS).build());
     if (StringUtils.isBlank(json)) {
-      return Collections.emptyMap();
+      return new HashMap<>();
     }
     List<Map<String, String>> confList =
         JacksonUtils.read(json, new TypeReference<List<Map<String, String>>>() 
{});
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
index f8d3d6fad..eb8b6688d 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/SparkApplication.java
@@ -422,7 +422,7 @@ public class SparkApplication implements Serializable {
   @SuppressWarnings("unchecked")
   public Map<String, Object> getOptionMap() {
     if (StringUtils.isBlank(this.options)) {
-      return Collections.emptyMap();
+      return new HashMap<>();
     }
     Map<String, Object> optionMap = JacksonUtils.read(this.options, Map.class);
     optionMap.entrySet().removeIf(entry -> entry.getValue() == null);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java
new file mode 100644
index 000000000..98a215f74
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/QuickStartRunner.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.runner;
+
+import org.apache.streampark.common.enums.ClusterState;
+import org.apache.streampark.common.enums.FlinkExecutionMode;
+import org.apache.streampark.common.util.PropertiesUtils;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.console.core.entity.FlinkEnv;
+import org.apache.streampark.console.core.entity.FlinkSql;
+import org.apache.streampark.console.core.service.AppBuildPipeService;
+import org.apache.streampark.console.core.service.FlinkClusterService;
+import org.apache.streampark.console.core.service.FlinkEnvService;
+import org.apache.streampark.console.core.service.FlinkSqlService;
+import 
org.apache.streampark.console.core.service.application.ApplicationManageService;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Order
+@Slf4j
+@Component
+public class QuickStartRunner implements ApplicationRunner {
+
+  @Autowired private FlinkEnvService flinkEnvService;
+
+  @Autowired private FlinkClusterService flinkClusterService;
+
+  @Autowired private FlinkSqlService flinkSqlService;
+
+  @Autowired private ApplicationManageService applicationManageService;
+
+  @Autowired private AppBuildPipeService appBuildPipeService;
+
+  private static Long defaultId = 100000L;
+
+  @Override
+  public void run(ApplicationArguments args) throws Exception {
+    Map<String, HashMap<String, String>> map =
+        PropertiesUtils.extractMultipleArgumentsAsJava(args.getSourceArgs());
+
+    Map<String, String> quickstart = map.get("quickstart");
+
+    if (quickstart != null && quickstart.size() == 3) {
+      // 1) create flinkEnv
+      FlinkEnv flinkEnv = new FlinkEnv();
+      flinkEnv.setFlinkName(quickstart.get("flink_name"));
+      flinkEnv.setFlinkHome(quickstart.get("flink_home"));
+      flinkEnvService.create(flinkEnv);
+
+      // 2) create flinkCluster
+      FlinkCluster flinkCluster = new FlinkCluster();
+      flinkCluster.setClusterName("quickstart");
+      flinkCluster.setVersionId(flinkEnv.getId());
+      flinkCluster.setClusterState(ClusterState.RUNNING.getState());
+      flinkCluster.setExecutionMode(FlinkExecutionMode.REMOTE.getMode());
+      flinkCluster.setAddress("http://localhost:"; + 
quickstart.get("flink_port"));
+      flinkClusterService.create(flinkCluster, defaultId);
+
+      // 3) set flink version and cluster
+      Application app = new Application();
+      app.setId(defaultId);
+      Application application = applicationManageService.getApp(app.getId());
+      application.setFlinkClusterId(flinkCluster.getId());
+      application.setVersionId(flinkEnv.getId());
+      application.setExecutionMode(FlinkExecutionMode.REMOTE.getMode());
+
+      FlinkSql flinkSql = flinkSqlService.getEffective(application.getId(), 
true);
+      application.setFlinkSql(flinkSql.getSql());
+
+      boolean success = applicationManageService.update(application);
+      if (success) {
+        // 4) build application
+        appBuildPipeService.buildApplication(defaultId, false);
+      }
+    }
+  }
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
index 30f0ea881..ba24b5809 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkClusterService.java
@@ -51,7 +51,7 @@ public interface FlinkClusterService extends 
IService<FlinkCluster> {
    * @param flinkCluster FlinkCluster to be create
    * @return Whether the creation is successful
    */
-  Boolean create(FlinkCluster flinkCluster);
+  Boolean create(FlinkCluster flinkCluster, Long userId);
 
   /**
    * Remove flink cluster
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index cf4d9adf0..b85ce4f45 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -105,8 +105,8 @@ import javax.annotation.Nonnull;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -615,14 +615,14 @@ public class AppBuildPipeServiceImpl
   @Override
   public Map<Long, PipelineStatusEnum> listAppIdPipelineStatusMap(List<Long> 
appIds) {
     if (CollectionUtils.isEmpty(appIds)) {
-      return Collections.emptyMap();
+      return new HashMap<>();
     }
     LambdaQueryWrapper<AppBuildPipeline> queryWrapper =
         new 
LambdaQueryWrapper<AppBuildPipeline>().in(AppBuildPipeline::getAppId, appIds);
 
     List<AppBuildPipeline> appBuildPipelines = 
baseMapper.selectList(queryWrapper);
     if (CollectionUtils.isEmpty(appBuildPipelines)) {
-      return Collections.emptyMap();
+      return new HashMap<>();
     }
     return appBuildPipelines.stream()
         .collect(Collectors.toMap(AppBuildPipeline::getAppId, 
AppBuildPipeline::getPipelineStatus));
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 73ee6370b..e90845312 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -28,7 +28,6 @@ import org.apache.streampark.console.core.entity.FlinkCluster;
 import org.apache.streampark.console.core.mapper.FlinkClusterMapper;
 import org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.console.core.service.FlinkEnvService;
-import org.apache.streampark.console.core.service.ServiceHelper;
 import org.apache.streampark.console.core.service.YarnQueueService;
 import 
org.apache.streampark.console.core.service.application.ApplicationInfoService;
 import org.apache.streampark.console.core.watcher.FlinkClusterWatcher;
@@ -82,8 +81,6 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
 
   @Autowired private FlinkEnvService flinkEnvService;
 
-  @Autowired private ServiceHelper serviceHelper;
-
   @Autowired private ApplicationInfoService applicationInfoService;
 
   @Autowired private YarnQueueService yarnQueueService;
@@ -141,8 +138,8 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
   }
 
   @Override
-  public Boolean create(FlinkCluster flinkCluster) {
-    flinkCluster.setUserId(serviceHelper.getUserId());
+  public Boolean create(FlinkCluster flinkCluster, Long userId) {
+    flinkCluster.setUserId(userId);
     return internalCreate(flinkCluster);
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java
index ebaaeadd0..620e4e4a5 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SparkAppBuildPipeServiceImpl.java
@@ -86,8 +86,8 @@ import javax.annotation.Nonnull;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -437,14 +437,14 @@ public class SparkAppBuildPipeServiceImpl
   @Override
   public Map<Long, PipelineStatusEnum> listAppIdPipelineStatusMap(List<Long> 
appIds) {
     if (CollectionUtils.isEmpty(appIds)) {
-      return Collections.emptyMap();
+      return new HashMap<>();
     }
     LambdaQueryWrapper<AppBuildPipeline> queryWrapper =
         new 
LambdaQueryWrapper<AppBuildPipeline>().in(AppBuildPipeline::getAppId, appIds);
 
     List<AppBuildPipeline> appBuildPipelines = 
baseMapper.selectList(queryWrapper);
     if (CollectionUtils.isEmpty(appBuildPipelines)) {
-      return Collections.emptyMap();
+      return new HashMap<>();
     }
     return appBuildPipelines.stream()
         .collect(Collectors.toMap(AppBuildPipeline::getAppId, 
AppBuildPipeline::getPipelineStatus));
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/YarnQueueLabelExpression.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/YarnQueueLabelExpression.java
index 9604084f8..f7f54514d 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/YarnQueueLabelExpression.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/YarnQueueLabelExpression.java
@@ -27,7 +27,6 @@ import com.google.common.annotations.VisibleForTesting;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -105,7 +104,7 @@ public class YarnQueueLabelExpression {
 
   public static Map<String, String> getQueueLabelMap(String queueLabelExp) {
     if (StringUtils.isBlank(queueLabelExp)) {
-      return Collections.emptyMap();
+      return new HashMap<>();
     }
     YarnQueueLabelExpression yarnQueueLabelExpression = of(queueLabelExp);
     Map<String, String> queueLabelMap = new HashMap<>(2);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java
index 3228c2266..9a5f05042 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.console.system.runner;
 
-import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.common.util.SystemPropertyUtils;
 
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -27,6 +27,8 @@ import 
org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.core.annotation.Order;
 import org.springframework.stereotype.Component;
 
+import java.time.LocalDateTime;
+
 @Order
 @Slf4j
 @Component
@@ -37,7 +39,20 @@ public class StartedUpRunner implements ApplicationRunner {
   @Override
   public void run(ApplicationArguments args) {
     if (context.isActive()) {
-      Utils.printLogo("streampark-console start successful");
+      String port = SystemPropertyUtils.get("server.port", "10000");
+      System.out.println("\n");
+      System.out.println("        _____ __                                     
        __       ");
+      System.out.println("       / ___// /_________  ____ _____ ___  ____  
____ ______/ /__     ");
+      System.out.println("       \\__ \\/ __/ ___/ _ \\/ __ `/ __ `__ \\/ __ 
\\  __ `/ ___/ //_/");
+      System.out.println("      ___/ / /_/ /  /  __/ /_/ / / / / / / /_/ / /_/ 
/ /  / ,<        ");
+      System.out.println("     /____/\\__/_/   \\___/\\__,_/_/ /_/ /_/ 
____/\\__,_/_/  /_/|_|   ");
+      System.out.println("                                       /_/           
             \n\n");
+      System.out.println("    Version:  2.2.0                                  
                 ");
+      System.out.println("    WebSite:  https://streampark.apache.org          
                 ");
+      System.out.println("    GitHub :  
https://github.com/apache/incubator-streampark          ");
+      System.out.println("    Info   :  streampark-console start successful    
                 ");
+      System.out.println("    Local  :  http://localhost:"; + port);
+      System.out.println("    Time   :  " + LocalDateTime.now() + "\n\n");
     }
   }
 }
diff --git a/streampark.sh b/streampark.sh
new file mode 100755
index 000000000..c8ae6379d
--- /dev/null
+++ b/streampark.sh
@@ -0,0 +1,282 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# shellcheck disable=SC2317
+
+# Bugzilla 37848: When no TTY is available, don't output to console
+have_tty=0
+# shellcheck disable=SC2006
+if [[ "`tty`" != "not a tty" ]]; then
+  have_tty=1
+fi
+
+# Bugzilla 37848: When no TTY is available, don't output to console
+have_tty=0
+# shellcheck disable=SC2006
+if [[ "`tty`" != "not a tty" ]]; then
+  have_tty=1
+fi
+
+ # Only use colors if connected to a terminal
+if [[ ${have_tty} -eq 1 ]]; then
+  RED=$(printf '\033[31m')
+  GREEN=$(printf '\033[32m')
+  BLUE=$(printf '\033[34m')
+  RESET=$(printf '\033[0m')
+else
+  RED=""
+  GREEN=""
+  BLUE=""
+  RESET=""
+fi
+
+echo_r () {
+  # Color red: Error, Failed
+  [[ $# -ne 1 ]] && return 1
+  # shellcheck disable=SC2059
+  printf "[%sStreamPark%s] %s$1%s\n"  "$BLUE" "$RESET" "$RED" "$RESET"
+}
+
+echo_g () {
+  # Color green: Success
+  [[ $# -ne 1 ]] && return 1
+  # shellcheck disable=SC2059
+  printf "[%sStreamPark%s] %s$1%s\n"  "$BLUE" "$RESET" "$GREEN" "$RESET"
+}
+
+# OS specific support.  $var _must_ be set to either true or false.
+cygwin=false;
+darwin=false;
+mingw=false
+case "$(uname)" in
+  CYGWIN*) cygwin=true ;;
+  MINGW*) mingw=true;;
+  Darwin*) darwin=true
+    # Use /usr/libexec/java_home if available, otherwise fall back to 
/Library/Java/Home
+    # See https://developer.apple.com/library/mac/qa/qa1170/_index.html
+    if [ -z "$JAVA_HOME" ]; then
+      if [ -x "/usr/libexec/java_home" ]; then
+        JAVA_HOME="$(/usr/libexec/java_home)"; export JAVA_HOME
+      else
+        JAVA_HOME="/Library/Java/Home"; export JAVA_HOME
+      fi
+    fi
+    ;;
+esac
+
+if [ -z "$JAVA_HOME" ] ; then
+  if [ -r /etc/gentoo-release ] ; then
+    JAVA_HOME=$(java-config --jre-home)
+  fi
+fi
+
+# For Cygwin, ensure paths are in UNIX format before anything is touched
+if $cygwin ; then
+  [ -n "$JAVA_HOME" ] &&
+    JAVA_HOME=$(cygpath --unix "$JAVA_HOME")
+  [ -n "$CLASSPATH" ] &&
+    CLASSPATH=$(cygpath --path --unix "$CLASSPATH")
+fi
+
+# For Mingw, ensure paths are in UNIX format before anything is touched
+if $mingw ; then
+  [ -n "$JAVA_HOME" ] && [ -d "$JAVA_HOME" ] &&
+    JAVA_HOME="$(cd "$JAVA_HOME" || (echo_r "cannot cd into $JAVA_HOME."; exit 
1); pwd)"
+fi
+
+if [ -z "$JAVA_HOME" ]; then
+  javaExecutable="$(which javac)"
+  if [ -n "$javaExecutable" ] && ! [ "$(expr "\"$javaExecutable\"" : '\([^ 
]*\)')" = "no" ]; then
+    # readlink(1) is not available as standard on Solaris 10.
+    readLink=$(which readlink)
+    if [ ! "$(expr "$readLink" : '\([^ ]*\)')" = "no" ]; then
+      if $darwin ; then
+        javaHome="$(dirname "\"$javaExecutable\"")"
+        javaExecutable="$(cd "\"$javaHome\"" && pwd -P)/javac"
+      else
+        javaExecutable="$(readlink -f "\"$javaExecutable\"")"
+      fi
+      javaHome="$(dirname "\"$javaExecutable\"")"
+      javaHome=$(expr "$javaHome" : '\(.*\)/bin')
+      JAVA_HOME="$javaHome"
+      export JAVA_HOME
+    fi
+  fi
+fi
+
+if [ -z "$JAVACMD" ] ; then
+  if [ -n "$JAVA_HOME"  ] ; then
+    if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
+      # IBM's JDK on AIX uses strange locations for the executables
+      JAVACMD="$JAVA_HOME/jre/sh/java"
+    else
+      JAVACMD="$JAVA_HOME/bin/java"
+    fi
+  else
+    JAVACMD="$(\unset -f command 2>/dev/null; \command -v java)"
+  fi
+fi
+
+if [ ! -x "$JAVACMD" ] ; then
+  echo_r "Error: JAVA_HOME is not defined correctly." >&2
+  echo_r "  We cannot execute $JAVACMD" >&2
+  exit 1
+fi
+
+if [ -z "$JAVA_HOME" ] ; then
+  echo_r "Warning: JAVA_HOME environment variable is not set."
+fi
+
+_RUNJAVA="$JAVA_HOME/bin/java"
+
+# resolve links - $0 may be a softlink
+PRG="$0"
+
+while [[ -h "$PRG" ]]; do
+  # shellcheck disable=SC2006
+  ls=`ls -ld "$PRG"`
+  # shellcheck disable=SC2006
+  link=`expr "$ls" : '.*-> \(.*\)$'`
+  if expr "$link" : '/.*' > /dev/null; then
+    PRG="$link"
+  else
+    # shellcheck disable=SC2006
+    PRG=`dirname "$PRG"`/"$link"
+  fi
+done
+
+# Get standard environment variables
+# shellcheck disable=SC2006
+PRG_DIR=`dirname "$PRG"`
+WORK_DIR=$(cd "$PRG_DIR" >/dev/null || exit; pwd)
+
+SP_VERSION="2.1.5"
+SP_NAME="apache-streampark_2.12-${SP_VERSION}-incubating-bin"
+SP_TAR="${SP_NAME}.tar.gz"
+SP_URL="https://archive.apache.org/dist/incubator/streampark/${SP_VERSION}/${SP_TAR}";
+SP_HOME="${WORK_DIR}"/"${SP_NAME}"
+SP_PATH="${WORK_DIR}"/"${SP_TAR}"
+SP_CONFIG="${SP_HOME}/conf/config.yaml"
+
+download() {
+  local url=$1
+  local name=$2
+  local path=$3
+  if command -v wget > /dev/null; then
+    wget "$url" -O "$path" || rm -f "$path"
+    # shellcheck disable=SC2181
+    if [[ $? -ne 0 ]]; then
+      echo_r "download $name failed, please try again."
+      exit 1
+    fi
+  elif command -v curl > /dev/null; then
+    curl -o "$path" "$url" -f -L || rm -f "$path"
+    # shellcheck disable=SC2181
+    if [[ $? -ne 0 ]]; then
+      echo_r "download $name failed, please try again."
+      exit 1
+    fi
+  else
+    echo "
+      import java.io.InputStream;
+      import java.net.URL;
+      import java.nio.file.Files;
+      import java.nio.file.Path;
+      import java.nio.file.Paths;
+      import java.nio.file.StandardCopyOption;
+
+      public class Downloader {
+        public static void main(String[] args) {
+          try {
+            URL url = new URL(args[0]);
+            Path path = Paths.get(args[1]).toAbsolutePath().normalize();
+            try (InputStream inStream = url.openStream()) {
+              Files.copy(inStream, path, StandardCopyOption.REPLACE_EXISTING);
+            }
+          } catch (Exception e) {
+            System.exit(1);
+          }
+        }
+      }" > "${WORK_DIR}"/Downloader.java
+
+    "$JAVA_HOME/bin/javac" "${WORK_DIR}"/Downloader.java && rm -f 
"${WORK_DIR}"/Downloader.java
+
+    "$JAVA_HOME/bin/java" -cp "${WORK_DIR}" Downloader "$url" "$path" && rm -f 
"${WORK_DIR}"/Downloader.class
+
+    if [[ $? -ne 0 ]]; then
+      echo_r "download $name failed, please try again."
+      exit 1
+    fi
+  fi
+}
+
+BASH_UTIL="org.apache.streampark.console.base.util.BashJavaUtils"
+
+# 1). download streampark.
+echo_g "download streampark..."
+
+download "$SP_URL" "$SP_TAR" "$SP_PATH"
+tar -xvf "${SP_TAR}" >/dev/null 2>&1 \
+    && rm -r "${SP_TAR}" \
+    && mkdir "${SP_HOME}"/flink \
+    && mkdir "${SP_HOME}"/workspace
+
+# 1.1) workspace
+$_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --replace "$SP_CONFIG" "local: 
||local: ${SP_HOME}/workspace  #"
+
+# 1.2) port.
+SP_PORT=$($_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --free_port "10000")
+$_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --replace "$SP_CONFIG" "port: 
10000||port: ${SP_PORT}"
+
+# 2). flink
+# shellcheck disable=SC2009
+FLINK_PROCESS="$(ps -ef | grep "flink-dist-" | grep 
'org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint')"
+if [[ -n "${FLINK_PROCESS}" ]]; then
+  FLINK_PARAM=$($_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --read_flink 
"$FLINK_PROCESS")
+  IFS=',' read -r -a ARRAY <<< "$FLINK_PARAM"
+  FLINK_HOME=${ARRAY[0]}
+  FLINK_NAME=${ARRAY[1]}
+  FLINK_PORT=${ARRAY[2]}
+else
+  FLINK_NAME="flink-1.19.0"
+  
FLINK_URL="https://archive.apache.org/dist/flink/${FLINK_NAME}/${FLINK_NAME}-bin-scala_2.12.tgz";
+  FLINK_TAR="${FLINK_NAME}-bin-scala_2.12.tgz"
+  FLINK_HOME="${WORK_DIR}"/${SP_NAME}/flink/${FLINK_NAME}
+  FLINK_PATH="${WORK_DIR}"/"${FLINK_TAR}"
+  FLINK_CONF="${FLINK_HOME}/conf/config.yaml"
+
+  # 1) download flink
+  echo_g "download flink..."
+  download "$FLINK_URL" "$FLINK_TAR" "$FLINK_PATH"
+  tar -xvf "${FLINK_TAR}" >/dev/null 2>&1 \
+    && rm -r "${FLINK_TAR}" \
+    && mv "$FLINK_NAME" "${WORK_DIR}"/"${SP_NAME}"/flink
+
+  # 2) start flink-cluster
+  FLINK_PORT=$($_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --free_port "8081")
+  $_RUNJAVA -cp "${SP_HOME}/lib/*" $BASH_UTIL --replace "$SP_CONFIG" "# port: 
8081||port: ${FLINK_PORT}"
+
+  bash +x "${FLINK_HOME}"/bin/start-cluster.sh
+fi
+
+# 3) start streampark
+bash +x "${SP_HOME}"/bin/startup.sh \
+  --quickstart flink_home="$FLINK_HOME" \
+  --quickstart flink_port="$FLINK_PORT" \
+  --quickstart flink_name="quickstart-$FLINK_NAME"


Reply via email to