This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
     new a97634a2c [Improve] merge from dev (#3021)
a97634a2c is described below

commit a97634a2c58c868554be0858883ce8d6219dcf8a
Author: benjobs <[email protected]>
AuthorDate: Mon Sep 4 11:16:32 2023 -0500

    [Improve] merge from dev (#3021)
    
    * [Improve] Merge from dev
    * FE dependency improvement
    * helm minor improvement
---
 deploy/helm/README.md                              | 36 ++++++++
 deploy/helm/streampark/templates/ingress.yaml      | 22 +++--
 deploy/helm/streampark/templates/streampark.yml    | 19 ++---
 deploy/helm/streampark/values.yaml                 | 63 ++++++++++----
 dist-material/release-docs/LICENSE                 |  4 -
 pom.xml                                            |  3 +-
 .../streampark/common/conf/K8sFlinkConfig.scala    | 12 +++
 .../src/main/assembly/bin/streampark.sh            |  9 +-
 .../src/main/resources/application.yml             |  3 +
 .../src/components/Form/src/hooks/useFormEvents.ts |  2 +-
 .../default/header/components/notify/index.vue     |  3 +-
 .../flink/client/trait/FlinkClientTrait.scala      | 94 ++++++++++-----------
 .../helper/KubernetesDeploymentHelper.scala        |  8 +-
 .../flink/kubernetes/ingress/IngressStrategy.scala |  9 +-
 .../kubernetes/ingress/IngressStrategyV1.scala     |  2 +-
 .../ingress/IngressStrategyV1beta1.scala           |  2 +-
 streampark-storage/pom.xml                         | 81 ------------------
 .../apache/streampark/storage/StorageService.java  | 28 -------
 .../streampark/storage/StorageServiceConfig.java   | 20 -----
 .../apache/streampark/storage/oss/OssConfig.java   | 31 -------
 .../streampark/storage/oss/OssStorageService.java  | 95 ----------------------
 .../storage/oss/OssStorageServiceTest.java         | 42 ----------
 22 files changed, 188 insertions(+), 400 deletions(-)

diff --git a/deploy/helm/README.md b/deploy/helm/README.md
new file mode 100644
index 000000000..f43b63346
--- /dev/null
+++ b/deploy/helm/README.md
@@ -0,0 +1,36 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+# Deploy StreamPark on k8s
+
+### 1. create template
+
+```shell
+helm template streampark/ -n default -f streampark/values.yaml --output-dir 
./result
+```
+
+### 2. apply
+
+```shell
+kubectl apply -f result/streampark/templates
+```
+
+### 3. open WebUI
+
+http://${host}:10000
+
+#### [more detail](streampark/templates/NOTES.txt)
diff --git a/deploy/helm/streampark/templates/ingress.yaml 
b/deploy/helm/streampark/templates/ingress.yaml
index 5c9ca778c..d6406bf03 100644
--- a/deploy/helm/streampark/templates/ingress.yaml
+++ b/deploy/helm/streampark/templates/ingress.yaml
@@ -15,23 +15,27 @@
 # limitations under the License.
 #
 {{- if .Values.ingress.enabled }}
-apiVersion: networking.k8s.io/v1beta1
+apiVersion: networking.k8s.io/v1
 kind: Ingress
 metadata:
   name: {{ include "streampark.name" . }}
+  namespace: {{ .Release.Namespace }}
   labels:
   {{- include "streampark.labels" . | nindent 4 }}
   annotations:
-    nginx.ingress.kubernetes.io/configuration-snippet: {{ 
.Values.nginx.ingress.kubernetes.configurationSnippet }}
-    nginx.ingress.kubernetes.io/proxy-body-size: {{ 
.Values.nginx.ingress.kubernetes.proxyBbodySize }}
-    nginx.ingress.kubernetes.io/rewrite-target: {{ 
.Values.nginx.ingress.kubernetes.rewriteTarget }}
+    {{- with .Values.ingress.annotations }}
+    {{- toYaml . | nindent 4 }}
+    {{- end }}
 spec:
   rules:
     - host: {{ .Values.ingress.host }}
       http:
         paths:
-        - backend:
-            serviceName: {{ .Values.service.name }}
-            servicePort: {{ .Values.spec.name }}
-          path: {{ .Values.ingress.path }}
-{{- end }}
+          - backend:
+              service:
+                name: {{ .Values.service.name }}
+                port:
+                  name: {{ .Values.spec.name }}
+            path: {{ .Values.ingress.path }}
+            pathType: {{ .Values.ingress.pathType }}
+{{- end }}
\ No newline at end of file
diff --git a/deploy/helm/streampark/templates/streampark.yml 
b/deploy/helm/streampark/templates/streampark.yml
index 5d7bcb916..fa8a5d4df 100755
--- a/deploy/helm/streampark/templates/streampark.yml
+++ b/deploy/helm/streampark/templates/streampark.yml
@@ -22,7 +22,7 @@ metadata:
   labels:
     {{- include "streampark.labels" . | nindent 4 }}
 spec:
-  replicas: {{ .Values.replicaCount}}
+  replicas: {{ .Values.spec.replicaCount }}
   selector:
     matchLabels:
       {{- include "streampark.selectorLabels" . | nindent 6 }}
@@ -57,11 +57,10 @@ spec:
               containerPort: {{ .Values.spec.containerPort }}
               protocol: TCP
           env:
-            - name: TZ
-              value: {{ .Values.timezone }}
+            {{- toYaml .Values.spec.container.env | nindent 12 }}
           securityContext:
             privileged: false
-          command: ["bash","-c","bash bin/startup.sh"]
+          command: ["bash","-c","bash ./bin/streampark.sh start_docker"]
           {{- if .Values.spec.livenessProbe.enabled }}
           livenessProbe:
             exec:
@@ -83,11 +82,10 @@ spec:
             failureThreshold: {{ .Values.spec.readinessProbe.failureThreshold 
}}
           {{- end }}
           volumeMounts:
-            - name: volume-docker
-              mountPath: /var/run/docker.sock
-              readOnly: true
             - name: streampark-default-config-volume
-              mountPath: /streampark/conf
+              mountPath: /usr/local/service/streampark/conf
+          resources:
+          {{- toYaml .Values.spec.resources | nindent 12 }}
       volumes:
         - name: streampark-default-config-volume
           configMap:
@@ -109,9 +107,6 @@ spec:
                 path: spy.properties
               - key: ValidationMessages.properties
                 path: ValidationMessages.properties
-        - name: volume-docker
-          hostPath:
-            path: /var/run/docker.sock
-            type: ""
+
 
 
diff --git a/deploy/helm/streampark/values.yaml 
b/deploy/helm/streampark/values.yaml
index a7116445b..bc97661b4 100644
--- a/deploy/helm/streampark/values.yaml
+++ b/deploy/helm/streampark/values.yaml
@@ -18,18 +18,40 @@
 # When enabled RBAC is only created for said namespaces, otherwise it is done 
for the cluster scope.
 # watchNamespaces: ["streampark"]
 
-timezone: "Asia/Shanghai"
-
 image:
   repository: "apache/streampark"
   pullPolicy: "IfNotPresent"
-  tag: "2.1.2"
+  tag: "2.1.1"
   pullSecret: ""
 
 rbac:
   create: true
 
 spec:
+  container:
+    env: [
+      {
+        name: TZ,
+        value: "Asia/Shanghai"
+      },
+      {
+        name: DOCKER_HOST,
+        value: "tcp://localhost:2375"
+      },
+      {
+        name: LANG,
+        value: en_US.UTF-8
+      },
+      {
+        name: LANGUAGE,
+        value: en_US:en
+      },
+      {
+        name: LC_ALL,
+        value: en_US.UTF-8
+      }
+    ]
+
   replicaCount: 1
   containerPort: 10000
   name: rest
@@ -42,7 +64,17 @@ spec:
   tolerations: [ ]
   ## Affinity is a group of affinity scheduling rules. If specified, the pod's 
scheduling constraints.
   ## More info: 
https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#affinity-v1-core
-  resources: { }
+  resources: {
+    limits: {
+      memory: "1Gi",
+      cpu: "1"
+    },
+    requests: {
+      memory: "1Gi",
+      cpu: "1"
+    }
+  }
+
   # resources:
   #   limits:
   #     memory: "2Gi"
@@ -72,10 +104,18 @@ spec:
     successThreshold: "1"
 
 ingress:
-  enabled: false
+  enabled: true
   host: "streampark.apache.org"
-  path: "/streampark"
-  annotations: {}
+  path: "/streampark(/|$)(.*)"
+  pathType: "ImplementationSpecific"
+  annotations: {
+    nginx.ingress.kubernetes.io/rewrite-target: "/$2",
+    nginx.ingress.kubernetes.io/proxy-body-size: "1024m",
+    ## fix swagger 404: 
https://github.com/springdoc/springdoc-openapi/issues/1741
+    ## add rewrite ^/v3/(.*)$ /streampark/v3/$1 redirect;
+    nginx.ingress.kubernetes.io/configuration-snippet: 'rewrite 
^(/streampark)$ $1/ permanent;',
+    kubernetes.io/ingress.class: "nginx"
+  }
 
 service:
   ## type determines how the Service is exposed. Defaults to ClusterIP. Valid 
options are ExternalName, ClusterIP, NodePort, and LoadBalancer
@@ -94,11 +134,4 @@ streamParkDefaultConfiguration:
 streamParkServiceAccount:
   create: true
   annotations: {}
-  name: "streampark"
-
-nginx:
-  ingress:
-    kubernetes:
-      configurationSnippet: "rewrite ^(/streampark)$ $1/ permanent;"
-      proxyBbodySize: "1024m"
-      rewriteTarget: "/$2"
+  name: "streampark"
\ No newline at end of file
diff --git a/dist-material/release-docs/LICENSE 
b/dist-material/release-docs/LICENSE
index 076604b61..7a903c3c2 100644
--- a/dist-material/release-docs/LICENSE
+++ b/dist-material/release-docs/LICENSE
@@ -190,10 +190,6 @@ Apache-2.0 licenses
 The following components are provided under the Apache-2.0 License. See 
project link for details.
 The text of each license is the standard Apache 2.0 license. 
https://www.apache.org/licenses/LICENSE-2.0.txt
 
-    https://mvnrepository.com/artifact/com.aliyun.oss/aliyun-sdk-oss/3.15.0 
Apache-2.0
-    https://mvnrepository.com/artifact/com.aliyun/aliyun-java-sdk-core/4.5.10 
Apache-2.0
-    https://mvnrepository.com/artifact/com.aliyun/aliyun-java-sdk-kms/2.11.0 
Apache-2.0
-    https://mvnrepository.com/artifact/com.aliyun/aliyun-java-sdk-ram/3.1.0 
Apache-2.0
     https://mvnrepository.com/artifact/com.baomidou/mybatis-plus/3.5.2 
Apache-2.0
     
https://mvnrepository.com/artifact/com.baomidou/mybatis-plus-annotation/3.5.2 
Apache-2.0
     
https://mvnrepository.com/artifact/com.baomidou/mybatis-plus-boot-starter/3.5.2 
Apache-2.0
diff --git a/pom.xml b/pom.xml
index 75a1f1f29..e3bfb67c5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -78,7 +78,6 @@
     <modules>
         <module>streampark-common</module>
         <module>streampark-flink</module>
-        <module>streampark-storage</module>
         <module>streampark-console</module>
     </modules>
 
@@ -117,7 +116,7 @@
         <snakeyaml.version>2.0</snakeyaml.version>
         <typesafe-conf.version>1.4.2</typesafe-conf.version>
         <json4s-jackson.version>4.0.6</json4s-jackson.version>
-        <commons-cli.version>1.3.1</commons-cli.version>
+        <commons-cli.version>1.5.0</commons-cli.version>
         <commons-net.version>3.9.0</commons-net.version>
         <commons-lang3.version>3.8.1</commons-lang3.version>
         <enumeratum.version>1.6.1</enumeratum.version>
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
index e85b2fbcf..bda675186 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
@@ -55,6 +55,18 @@ object K8sFlinkConfig {
     description = "retained tracking time for SILENT state flink tasks"
   )
 
+  /**
+   * If an ingress controller is specified in the configuration, the ingress 
class
+   * kubernetes.io/ingress.class must be specified when creating the ingress, 
since there are often
+   * multiple ingress controllers in a production environment.
+   */
+  val ingressClass: InternalOption = InternalOption(
+    key = "streampark.flink-k8s.ingress.class",
+    defaultValue = "streampark",
+    classType = classOf[java.lang.String],
+    description = "Direct ingress to the ingress controller."
+  )
+
   /** kubernetes default namespace */
   val DEFAULT_KUBERNETES_NAMESPACE = "default"
 
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
 
b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
index d32df01fd..965f15c2c 100755
--- 
a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
@@ -270,8 +270,8 @@ print_logo() {
   printf '      %s  ___/ / /_/ /  /  __/ /_/ / / / / / / /_/ / /_/ / /  / ,<   
     %s\n'          $PRIMARY $RESET
   printf '      %s /____/\__/_/   \___/\__,_/_/ /_/ /_/ ____/\__,_/_/  /_/|_|  
     %s\n'          $PRIMARY $RESET
   printf '      %s                                   /_/                       
     %s\n\n'        $PRIMARY $RESET
-  printf '      %s   Version:  2.1.2 %s\n'                                     
           $BLUE   $RESET
-  printf '      %s   WebSite:  https://streampark.apache.org%s\n'              
                     $BLUE   $RESET
+  printf '      %s   Version:  2.1.2 %s\n'                                     
                    $BLUE   $RESET
+  printf '      %s   WebSite:  https://streampark.apache.org%s\n'              
                    $BLUE   $RESET
   printf '      %s   GitHub :  http://github.com/apache/streampark%s\n\n'      
                    $BLUE   $RESET
   printf '      %s   ──────── Apache StreamPark, Make stream processing easier 
ô~ô!%s\n\n'         $PRIMARY  $RESET
 }
@@ -622,11 +622,12 @@ main() {
         echo_r "Unknown command: $1"
         echo_w "Usage: streampark.sh ( commands ... )"
         echo_w "commands:"
-        echo_w "  start \$conf               Start StreamPark with application 
config."
+        echo_w "  start \$conf              Start StreamPark with application 
config."
         echo_w "  stop                      Stop StreamPark, wait up to 3 
seconds and then use kill -KILL if still running"
+        echo_w "  start_docker              start in docker or k8s mode"
         echo_w "  status                    StreamPark status"
         echo_w "  debug                     StreamPark start with debug 
mode,start debug mode, like: bash streampark.sh debug 10002"
-        echo_w "  restart \$conf             restart StreamPark with 
application config."
+        echo_w "  restart \$conf            restart StreamPark with 
application config."
         exit 0
         ;;
   esac
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/application.yml
 
b/streampark-console/streampark-console-service/src/main/resources/application.yml
index 5b832f547..6f00aa0da 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/application.yml
+++ 
b/streampark-console/streampark-console-service/src/main/resources/application.yml
@@ -118,6 +118,9 @@ streampark:
       polling-interval-sec:
         job-status: 2
         cluster-metric: 3
+    # If you need to specify an ingress controller, you can use this.
+    ingress:
+      class: nginx
 
   # packer garbage resources collection configuration
   packer-gc:
diff --git 
a/streampark-console/streampark-console-webapp/src/components/Form/src/hooks/useFormEvents.ts
 
b/streampark-console/streampark-console-webapp/src/components/Form/src/hooks/useFormEvents.ts
index 6409ce5c2..32bb12095 100644
--- 
a/streampark-console/streampark-console-webapp/src/components/Form/src/hooks/useFormEvents.ts
+++ 
b/streampark-console/streampark-console-webapp/src/components/Form/src/hooks/useFormEvents.ts
@@ -109,7 +109,7 @@ export function useFormEvents({
       } else {
         nestKeyArray.forEach((nestKey: string) => {
           try {
-            const value = eval('values' + delimiter + nestKey);
+            const value = nestKey.split('.').reduce((out, item) => out[item], 
values);
             if (isDef(value)) {
               formModel[nestKey] = value;
               validKeys.push(nestKey);
diff --git 
a/streampark-console/streampark-console-webapp/src/layouts/default/header/components/notify/index.vue
 
b/streampark-console/streampark-console-webapp/src/layouts/default/header/components/notify/index.vue
index b2bea1b87..534c73b31 100644
--- 
a/streampark-console/streampark-console-webapp/src/layouts/default/header/components/notify/index.vue
+++ 
b/streampark-console/streampark-console-webapp/src/layouts/default/header/components/notify/index.vue
@@ -137,8 +137,9 @@
           listData.value[1].list = [...data];
         }
       }
+
       const wbSocketUrl = `${window.location.origin}${
-        import.meta.env.VITE_GLOB_API_URL
+        import.meta.env.VITE_GLOB_API_URL + 
(import.meta.env.VITE_GLOB_API_URL_PREFIX || '')
       }/websocket/${userStore.getUserInfo.userId}`;
 
       const { data } = useWebSocket(wbSocketUrl.replace(/http/, 'ws'), {
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 8331517c8..43308ae86 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -147,6 +147,11 @@ trait FlinkClientTrait extends Logger {
           }
         })
     }
+
+    setConfig(submitRequest, flinkConfig)
+
+    doSubmit(submitRequest, flinkConfig)
+
   }
 
   def setConfig(submitRequest: SubmitRequest, flinkConf: Configuration): Unit
@@ -304,7 +309,6 @@ trait FlinkClientTrait extends Logger {
       submitRequest.appOption
         .filter(
           x => {
-            // 验证参数是否合法...
             val verify = commandLineOptions.hasOption(x._1)
             if (!verify) logWarn(s"param:${x._1} is error,skip it.")
             verify
@@ -353,8 +357,6 @@ trait FlinkClientTrait extends Logger {
 
     logger.info(s"cliArgs: ${cliArgs.mkString(" ")}")
 
-    FlinkRunOption.parse(commandLineOptions, cliArgs, true)
-
     val commandLine = FlinkRunOption.parse(commandLineOptions, cliArgs, true)
 
     val activeCommandLine = validateAndGetActiveCommandLine(
@@ -401,86 +403,78 @@ trait FlinkClientTrait extends Logger {
   }
 
   private[this] def extractProgramArgs(submitRequest: SubmitRequest): 
JavaList[String] = {
-
     val programArgs = new ArrayBuffer[String]()
+    val args = submitRequest.args
 
-    if (StringUtils.isNotEmpty(submitRequest.args)) {
-      val multiLineChar = "\"\"\""
-      val array = submitRequest.args.split("\\s+")
-      if (!array.exists(_.startsWith(multiLineChar))) {
+    if (StringUtils.isNotEmpty(args)) {
+      val multiChar = "\""
+      val array = args.split("\\s+")
+      if (!array.exists(_.startsWith(multiChar))) {
         array.foreach(programArgs +=)
       } else {
         val argsArray = new ArrayBuffer[String]()
         val tempBuffer = new ArrayBuffer[String]()
 
-        @tailrec def processElement(index: Int, multiLine: Boolean): Unit = {
+        @tailrec
+        def processElement(index: Int, multi: Boolean): Unit = {
+
           if (index == array.length) {
             if (tempBuffer.nonEmpty) {
               argsArray += tempBuffer.mkString(" ")
             }
             return
           }
+
           val next = index + 1
-          val elem = array(index)
+          val elem = array(index).trim
 
-          if (elem.trim.nonEmpty) {
-            if (!multiLine) {
-              if (elem.startsWith(multiLineChar)) {
-                tempBuffer += elem.drop(3)
-                processElement(next, multiLine = true)
-              } else {
-                argsArray += elem
-                processElement(next, multiLine = false)
-              }
-            } else {
-              if (elem.endsWith(multiLineChar)) {
-                tempBuffer += elem.dropRight(3)
+          if (elem.isEmpty) {
+            processElement(next, multi = false)
+          } else {
+            if (multi) {
+              if (elem.endsWith(multiChar)) {
+                tempBuffer += elem.dropRight(1)
                 argsArray += tempBuffer.mkString(" ")
                 tempBuffer.clear()
-                processElement(next, multiLine = false)
+                processElement(next, multi = false)
               } else {
                 tempBuffer += elem
-                processElement(next, multiLine)
+                processElement(next, multi)
+              }
+            } else {
+              val until = if (elem.endsWith(multiChar)) 1 else 0
+              if (elem.startsWith(multiChar)) {
+                tempBuffer += elem.drop(1).dropRight(until)
+                processElement(next, multi = true)
+              } else {
+                argsArray += elem.dropRight(until)
+                processElement(next, multi = false)
               }
             }
-          } else {
-            tempBuffer += elem
-            processElement(next, multiLine = false)
           }
         }
 
-        processElement(0, multiLine = false)
-        argsArray.foreach(x => programArgs += x.trim)
+        processElement(0, multi = false)
+        argsArray.foreach(x => programArgs += x)
       }
     }
 
     if (submitRequest.applicationType == ApplicationType.STREAMPARK_FLINK) {
-      programArgs += PARAM_KEY_FLINK_CONF
-      programArgs += submitRequest.flinkYaml
-      programArgs += PARAM_KEY_APP_NAME
-      programArgs += DeflaterUtils.zipString(submitRequest.effectiveAppName)
-      programArgs += PARAM_KEY_FLINK_PARALLELISM
-      programArgs += getParallelism(submitRequest).toString
+
+      programArgs += PARAM_KEY_FLINK_CONF += submitRequest.flinkYaml
+      programArgs += PARAM_KEY_APP_NAME += 
DeflaterUtils.zipString(submitRequest.effectiveAppName)
+      programArgs += PARAM_KEY_FLINK_PARALLELISM += 
getParallelism(submitRequest).toString
+
       submitRequest.developmentMode match {
         case DevelopmentMode.FLINK_SQL =>
-          programArgs += PARAM_KEY_FLINK_SQL
-          programArgs += submitRequest.flinkSQL
+          programArgs += PARAM_KEY_FLINK_SQL += submitRequest.flinkSQL
           if (submitRequest.appConf != null) {
-            programArgs += PARAM_KEY_APP_CONF
-            programArgs += submitRequest.appConf
+            programArgs += PARAM_KEY_APP_CONF += submitRequest.appConf
           }
         case _ if 
Try(!submitRequest.appConf.startsWith("json:")).getOrElse(true) =>
-          programArgs += PARAM_KEY_APP_CONF
-          programArgs += submitRequest.appConf
+          programArgs += PARAM_KEY_APP_CONF += submitRequest.appConf
       }
-    }
 
-    // execution.runtime-mode
-    if (submitRequest.properties.nonEmpty) {
-      if 
(submitRequest.properties.containsKey(ExecutionOptions.RUNTIME_MODE.key())) {
-        programArgs += s"--${ExecutionOptions.RUNTIME_MODE.key()}"
-        programArgs += 
submitRequest.properties.get(ExecutionOptions.RUNTIME_MODE.key()).toString
-      }
     }
     programArgs.toList.asJava
   }
@@ -491,8 +485,6 @@ trait FlinkClientTrait extends Logger {
       commandLine: CommandLine): Configuration = {
 
     require(activeCustomCommandLine != null, "activeCustomCommandLine must not 
be null.")
-    val executorConfig = activeCustomCommandLine.toConfiguration(commandLine)
-    val customConfiguration = new Configuration(executorConfig)
     val configuration = new Configuration()
     val flinkDefaultConfiguration = getFlinkDefaultConfiguration(flinkHome)
     flinkDefaultConfiguration.keySet.foreach(
@@ -502,7 +494,7 @@ trait FlinkClientTrait extends Logger {
           case _ =>
         }
       })
-    configuration.addAll(customConfiguration)
+    configuration.addAll(activeCustomCommandLine.toConfiguration(commandLine))
     configuration
   }
 
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
index 59c82f471..5495814f2 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
@@ -58,7 +58,13 @@ object KubernetesDeploymentHelper extends Logger {
   def getDeploymentStatusChanges(nameSpace: String, deploymentName: String): 
Boolean = {
     Try {
       val pods = getPods(nameSpace, deploymentName)
-      pods.head.getStatus.getContainerStatuses.head.getLastState.getTerminated 
!= null
+      val podStatus = pods.head.getStatus
+      podStatus.getPhase match {
+        case "Unknown" => true
+        case "Failed" => true
+        case "Pending" => false
+        case _ => 
podStatus.getContainerStatuses.head.getLastState.getTerminated != null
+      }
     }.getOrElse(true)
   }
 
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
index e03d3ae5a..d07e2aff8 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
@@ -17,6 +17,8 @@
 
 package org.apache.streampark.flink.kubernetes.ingress
 
+import org.apache.streampark.common.conf.{ConfigConst, InternalConfigHolder, 
K8sFlinkConfig}
+
 import io.fabric8.kubernetes.api.model.{OwnerReference, OwnerReferenceBuilder}
 import io.fabric8.kubernetes.client.DefaultKubernetesClient
 import org.apache.commons.io.FileUtils
@@ -46,11 +48,16 @@ trait IngressStrategy {
   }
 
   def buildIngressAnnotations(clusterId: String): Map[String, String] = {
-    Map(
+    val annotations = Map(
       "nginx.ingress.kubernetes.io/rewrite-target" -> "/$2",
       "nginx.ingress.kubernetes.io/proxy-body-size" -> "1024m",
       "nginx.ingress.kubernetes.io/configuration-snippet" -> ("rewrite ^(/" + 
clusterId + ")$ $1/ permanent;")
     )
+    val ingressClass = 
InternalConfigHolder.get[String](K8sFlinkConfig.ingressClass)
+    if (ingressClass.nonEmpty) {
+      annotations + ("kubernetes.io/ingress.class" -> ingressClass)
+    }
+    annotations
   }
 
   def buildIngressLabels(clusterId: String): Map[String, String] = {
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
index 8fb5711e6..c5c2fdd0b 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
@@ -40,7 +40,7 @@ class IngressStrategyV1 extends IngressStrategy {
           
Option(client.network.v1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
             .map(ingress => ingress.getSpec.getRules.get(0))
             .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
-            .map { case (host, path) => s"https://$host$path"; }
+            .map { case (host, path) => s"http://$host$path"; }
             .getOrElse(clusterClient.getWebInterfaceURL)
         }.recover {
           case e =>
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
index 97654796c..8cbc276e4 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
@@ -41,7 +41,7 @@ class IngressStrategyV1beta1 extends IngressStrategy {
           
Option(client.network.v1beta1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
             .map(ingress => ingress.getSpec.getRules.get(0))
             .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
-            .map { case (host, path) => s"https://$host$path"; }
+            .map { case (host, path) => s"http://$host$path"; }
             .getOrElse(clusterClient.getWebInterfaceURL)
         }.recover {
           case e =>
diff --git a/streampark-storage/pom.xml b/streampark-storage/pom.xml
deleted file mode 100644
index a427da312..000000000
--- a/streampark-storage/pom.xml
+++ /dev/null
@@ -1,81 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-<project xmlns="http://maven.apache.org/POM/4.0.0";
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <groupId>org.apache.streampark</groupId>
-        <artifactId>streampark</artifactId>
-        <version>2.1.2</version>
-    </parent>
-
-    <artifactId>streampark-storage</artifactId>
-    <name>StreamPark : Storage Service</name>
-
-    <properties>
-        <maven.compiler.source>8</maven.compiler.source>
-        <maven.compiler.target>8</maven.compiler.target>
-        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <aliyun.oss.version>3.15.0</aliyun.oss.version>
-    </properties>
-
-    <dependencies>
-        <dependency>
-            <groupId>org.projectlombok</groupId>
-            <artifactId>lombok</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>com.aliyun.oss</groupId>
-            <artifactId>aliyun-sdk-oss</artifactId>
-            <version>${aliyun.oss.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.jdom</groupId>
-                    <artifactId>jdom2</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-        </dependency>
-
-        <!--test-->
-        <dependency>
-            <groupId>org.junit.jupiter</groupId>
-            <artifactId>junit-jupiter-engine</artifactId>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-deploy-plugin</artifactId>
-                <configuration>
-                    <skip>true</skip>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-
-</project>
diff --git 
a/streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java
 
b/streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java
deleted file mode 100644
index e0ff3201c..000000000
--- 
a/streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.storage;
-
-/**
- * StorageService will be used as artifacts fetcher in pod template, so it 
should rely on other
- * modules.
- */
-public interface StorageService {
-  void getData(String objectPath, String localFilePath) throws Exception;
-
-  void putData(String objectPath, String localFilePath) throws Exception;
-}
diff --git 
a/streampark-storage/src/main/java/org/apache/streampark/storage/StorageServiceConfig.java
 
b/streampark-storage/src/main/java/org/apache/streampark/storage/StorageServiceConfig.java
deleted file mode 100644
index 8e17e33d1..000000000
--- 
a/streampark-storage/src/main/java/org/apache/streampark/storage/StorageServiceConfig.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.storage;
-
-public abstract class StorageServiceConfig {}
diff --git 
a/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssConfig.java
 
b/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssConfig.java
deleted file mode 100644
index 986e98f73..000000000
--- 
a/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssConfig.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.storage.oss;
-
-import org.apache.streampark.storage.StorageServiceConfig;
-
-import lombok.Data;
-
-@Data
-public class OssConfig extends StorageServiceConfig {
-  private String accessKeyId;
-  private String accessKeySecret;
-  private String endpoint;
-  private String bucket;
-  private String baseUri;
-}
diff --git 
a/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssStorageService.java
 
b/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssStorageService.java
deleted file mode 100644
index 98cffa270..000000000
--- 
a/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssStorageService.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.storage.oss;
-
-import org.apache.streampark.storage.StorageService;
-
-import com.aliyun.oss.ClientException;
-import com.aliyun.oss.OSS;
-import com.aliyun.oss.OSSClientBuilder;
-import com.aliyun.oss.OSSException;
-import com.aliyun.oss.model.GetObjectRequest;
-import com.aliyun.oss.model.PutObjectRequest;
-import com.google.common.annotations.VisibleForTesting;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.File;
-
-@Slf4j
-public class OssStorageService implements StorageService {
-
-  final OssConfig ossConfig;
-  final OSS ossClient;
-
-  public OssStorageService(OssConfig config) {
-    this.ossConfig = config;
-    this.ossClient =
-        new OSSClientBuilder()
-            .build(
-                ossConfig.getEndpoint(),
-                ossConfig.getAccessKeyId(),
-                ossConfig.getAccessKeySecret());
-  }
-
-  @Override
-  public void getData(String objectPath, String localFilePath) throws 
Exception {
-    String bucket = ossConfig.getBucket();
-
-    if (!ossClient.doesObjectExist(bucket, objectPath)) {
-      throw new RuntimeException(String.format("File '%s' not exist", 
objectPath));
-    }
-
-    try {
-      ossClient.getObject(new GetObjectRequest(bucket, objectPath), new 
File(localFilePath));
-    } catch (Exception e) {
-      log.error("GetData failed. ObjectPath: {}, local path: {}.", objectPath, 
localFilePath, e);
-      throw handleOssException(e);
-    }
-  }
-
-  @Override
-  public void putData(String objectPath, String localFilePath) throws 
Exception {
-    try {
-      PutObjectRequest putObjectRequest =
-          new PutObjectRequest(ossConfig.getBucket(), objectPath, new 
File(localFilePath));
-      ossClient.putObject(putObjectRequest);
-    } catch (Exception e) {
-      log.error("PutData failed. ObjectPath: {}, local path: {}.", objectPath, 
localFilePath, e);
-      throw handleOssException(e);
-    }
-  }
-
-  @VisibleForTesting
-  static RuntimeException handleOssException(Exception e) {
-    if (e instanceof OSSException) {
-      OSSException oe = (OSSException) e;
-      String errMsg =
-          String.format(
-              "Caught an OSSException. Error Message: %s." + " Error Code: %s. 
Request ID: %s",
-              oe.getErrorMessage(), oe.getErrorCode(), oe.getRequestId());
-      return new RuntimeException(errMsg, oe);
-    } else if (e instanceof ClientException) {
-      ClientException ce = (ClientException) e;
-      String errMsg =
-          String.format("Caught an ClientException. Error Message: %s.", 
ce.getMessage());
-      return new RuntimeException(errMsg, ce);
-    } else {
-      return new RuntimeException(e);
-    }
-  }
-}
diff --git 
a/streampark-storage/src/test/java/org/apache/streampark/storage/oss/OssStorageServiceTest.java
 
b/streampark-storage/src/test/java/org/apache/streampark/storage/oss/OssStorageServiceTest.java
deleted file mode 100644
index 48ab4d939..000000000
--- 
a/streampark-storage/src/test/java/org/apache/streampark/storage/oss/OssStorageServiceTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.storage.oss;
-
-import com.aliyun.oss.ClientException;
-import com.aliyun.oss.OSSException;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-class OssStorageServiceTest {
-
-  @Test
-  void testHandleException() {
-    OSSException ossException =
-        new OSSException(
-            "mock error", "MOCK_CODE", "requestId", "hostId", "header", 
"resource", "GET");
-    RuntimeException exp = OssStorageService.handleOssException(ossException);
-    Assertions.assertEquals(
-        "Caught an OSSException. Error Message: mock error. Error Code: 
MOCK_CODE. Request ID: requestId",
-        exp.getMessage());
-
-    ClientException ossClientException = new ClientException("Client ERROR");
-    exp = OssStorageService.handleOssException(ossClientException);
-    Assertions.assertTrue(
-        exp.getMessage().startsWith("Caught an ClientException. Error Message: 
Client ERROR"));
-  }
-}


Reply via email to