This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
new a97634a2c [Improve] merge from dev (#3021)
a97634a2c is described below
commit a97634a2c58c868554be0858883ce8d6219dcf8a
Author: benjobs <[email protected]>
AuthorDate: Mon Sep 4 11:16:32 2023 -0500
[Improve] merge from dev (#3021)
* [Improve] Merge from dev
* FE dependency improvement
* helm minor improvement
---
deploy/helm/README.md | 36 ++++++++
deploy/helm/streampark/templates/ingress.yaml | 22 +++--
deploy/helm/streampark/templates/streampark.yml | 19 ++---
deploy/helm/streampark/values.yaml | 63 ++++++++++----
dist-material/release-docs/LICENSE | 4 -
pom.xml | 3 +-
.../streampark/common/conf/K8sFlinkConfig.scala | 12 +++
.../src/main/assembly/bin/streampark.sh | 9 +-
.../src/main/resources/application.yml | 3 +
.../src/components/Form/src/hooks/useFormEvents.ts | 2 +-
.../default/header/components/notify/index.vue | 3 +-
.../flink/client/trait/FlinkClientTrait.scala | 94 ++++++++++-----------
.../helper/KubernetesDeploymentHelper.scala | 8 +-
.../flink/kubernetes/ingress/IngressStrategy.scala | 9 +-
.../kubernetes/ingress/IngressStrategyV1.scala | 2 +-
.../ingress/IngressStrategyV1beta1.scala | 2 +-
streampark-storage/pom.xml | 81 ------------------
.../apache/streampark/storage/StorageService.java | 28 -------
.../streampark/storage/StorageServiceConfig.java | 20 -----
.../apache/streampark/storage/oss/OssConfig.java | 31 -------
.../streampark/storage/oss/OssStorageService.java | 95 ----------------------
.../storage/oss/OssStorageServiceTest.java | 42 ----------
22 files changed, 188 insertions(+), 400 deletions(-)
diff --git a/deploy/helm/README.md b/deploy/helm/README.md
new file mode 100644
index 000000000..f43b63346
--- /dev/null
+++ b/deploy/helm/README.md
@@ -0,0 +1,36 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+# Deploy StreamPark on k8s
+
+### 1. create template
+
+```shell
+helm template streampark/ -n default -f streampark/values.yaml --output-dir
./result
+```
+
+### 2. apply
+
+```shell
+kubectl apply -f result/streampark/templates
+```
+
+### 3. open WebUI
+
+http://${host}:10000
+
+#### [more detail](streampark/templates/NOTES.txt)
diff --git a/deploy/helm/streampark/templates/ingress.yaml
b/deploy/helm/streampark/templates/ingress.yaml
index 5c9ca778c..d6406bf03 100644
--- a/deploy/helm/streampark/templates/ingress.yaml
+++ b/deploy/helm/streampark/templates/ingress.yaml
@@ -15,23 +15,27 @@
# limitations under the License.
#
{{- if .Values.ingress.enabled }}
-apiVersion: networking.k8s.io/v1beta1
+apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: {{ include "streampark.name" . }}
+ namespace: {{ .Release.Namespace }}
labels:
{{- include "streampark.labels" . | nindent 4 }}
annotations:
- nginx.ingress.kubernetes.io/configuration-snippet: {{
.Values.nginx.ingress.kubernetes.configurationSnippet }}
- nginx.ingress.kubernetes.io/proxy-body-size: {{
.Values.nginx.ingress.kubernetes.proxyBbodySize }}
- nginx.ingress.kubernetes.io/rewrite-target: {{
.Values.nginx.ingress.kubernetes.rewriteTarget }}
+ {{- with .Values.ingress.annotations }}
+ {{- toYaml . | nindent 4 }}
+ {{- end }}
spec:
rules:
- host: {{ .Values.ingress.host }}
http:
paths:
- - backend:
- serviceName: {{ .Values.service.name }}
- servicePort: {{ .Values.spec.name }}
- path: {{ .Values.ingress.path }}
-{{- end }}
+ - backend:
+ service:
+ name: {{ .Values.service.name }}
+ port:
+ name: {{ .Values.spec.name }}
+ path: {{ .Values.ingress.path }}
+ pathType: {{ .Values.ingress.pathType }}
+{{- end }}
\ No newline at end of file
diff --git a/deploy/helm/streampark/templates/streampark.yml
b/deploy/helm/streampark/templates/streampark.yml
index 5d7bcb916..fa8a5d4df 100755
--- a/deploy/helm/streampark/templates/streampark.yml
+++ b/deploy/helm/streampark/templates/streampark.yml
@@ -22,7 +22,7 @@ metadata:
labels:
{{- include "streampark.labels" . | nindent 4 }}
spec:
- replicas: {{ .Values.replicaCount}}
+ replicas: {{ .Values.spec.replicaCount }}
selector:
matchLabels:
{{- include "streampark.selectorLabels" . | nindent 6 }}
@@ -57,11 +57,10 @@ spec:
containerPort: {{ .Values.spec.containerPort }}
protocol: TCP
env:
- - name: TZ
- value: {{ .Values.timezone }}
+ {{- toYaml .Values.spec.container.env | nindent 12 }}
securityContext:
privileged: false
- command: ["bash","-c","bash bin/startup.sh"]
+ command: ["bash","-c","bash ./bin/streampark.sh start_docker"]
{{- if .Values.spec.livenessProbe.enabled }}
livenessProbe:
exec:
@@ -83,11 +82,10 @@ spec:
failureThreshold: {{ .Values.spec.readinessProbe.failureThreshold
}}
{{- end }}
volumeMounts:
- - name: volume-docker
- mountPath: /var/run/docker.sock
- readOnly: true
- name: streampark-default-config-volume
- mountPath: /streampark/conf
+ mountPath: /usr/local/service/streampark/conf
+ resources:
+ {{- toYaml .Values.spec.resources | nindent 12 }}
volumes:
- name: streampark-default-config-volume
configMap:
@@ -109,9 +107,6 @@ spec:
path: spy.properties
- key: ValidationMessages.properties
path: ValidationMessages.properties
- - name: volume-docker
- hostPath:
- path: /var/run/docker.sock
- type: ""
+
diff --git a/deploy/helm/streampark/values.yaml
b/deploy/helm/streampark/values.yaml
index a7116445b..bc97661b4 100644
--- a/deploy/helm/streampark/values.yaml
+++ b/deploy/helm/streampark/values.yaml
@@ -18,18 +18,40 @@
# When enabled RBAC is only created for said namespaces, otherwise it is done
for the cluster scope.
# watchNamespaces: ["streampark"]
-timezone: "Asia/Shanghai"
-
image:
repository: "apache/streampark"
pullPolicy: "IfNotPresent"
- tag: "2.1.2"
+ tag: "2.1.1"
pullSecret: ""
rbac:
create: true
spec:
+ container:
+ env: [
+ {
+ name: TZ,
+ value: "Asia/Shanghai"
+ },
+ {
+ name: DOCKER_HOST,
+ value: "tcp://localhost:2375"
+ },
+ {
+ name: LANG,
+ value: en_US.UTF-8
+ },
+ {
+ name: LANGUAGE,
+ value: en_US:en
+ },
+ {
+ name: LC_ALL,
+ value: en_US.UTF-8
+ }
+ ]
+
replicaCount: 1
containerPort: 10000
name: rest
@@ -42,7 +64,17 @@ spec:
tolerations: [ ]
## Affinity is a group of affinity scheduling rules. If specified, the pod's
scheduling constraints.
## More info:
https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#affinity-v1-core
- resources: { }
+ resources: {
+ limits: {
+ memory: "1Gi",
+ cpu: "1"
+ },
+ requests: {
+ memory: "1Gi",
+ cpu: "1"
+ }
+ }
+
# resources:
# limits:
# memory: "2Gi"
@@ -72,10 +104,18 @@ spec:
successThreshold: "1"
ingress:
- enabled: false
+ enabled: true
host: "streampark.apache.org"
- path: "/streampark"
- annotations: {}
+ path: "/streampark(/|$)(.*)"
+ pathType: "ImplementationSpecific"
+ annotations: {
+ nginx.ingress.kubernetes.io/rewrite-target: "/$2",
+ nginx.ingress.kubernetes.io/proxy-body-size: "1024m",
+ ## fix swagger 404:
https://github.com/springdoc/springdoc-openapi/issues/1741
+ ## add rewrite ^/v3/(.*)$ /streampark/v3/$1 redirect;
+ nginx.ingress.kubernetes.io/configuration-snippet: 'rewrite
^(/streampark)$ $1/ permanent;',
+ kubernetes.io/ingress.class: "nginx"
+ }
service:
## type determines how the Service is exposed. Defaults to ClusterIP. Valid
options are ExternalName, ClusterIP, NodePort, and LoadBalancer
@@ -94,11 +134,4 @@ streamParkDefaultConfiguration:
streamParkServiceAccount:
create: true
annotations: {}
- name: "streampark"
-
-nginx:
- ingress:
- kubernetes:
- configurationSnippet: "rewrite ^(/streampark)$ $1/ permanent;"
- proxyBbodySize: "1024m"
- rewriteTarget: "/$2"
+ name: "streampark"
\ No newline at end of file
diff --git a/dist-material/release-docs/LICENSE
b/dist-material/release-docs/LICENSE
index 076604b61..7a903c3c2 100644
--- a/dist-material/release-docs/LICENSE
+++ b/dist-material/release-docs/LICENSE
@@ -190,10 +190,6 @@ Apache-2.0 licenses
The following components are provided under the Apache-2.0 License. See
project link for details.
The text of each license is the standard Apache 2.0 license.
https://www.apache.org/licenses/LICENSE-2.0.txt
- https://mvnrepository.com/artifact/com.aliyun.oss/aliyun-sdk-oss/3.15.0
Apache-2.0
- https://mvnrepository.com/artifact/com.aliyun/aliyun-java-sdk-core/4.5.10
Apache-2.0
- https://mvnrepository.com/artifact/com.aliyun/aliyun-java-sdk-kms/2.11.0
Apache-2.0
- https://mvnrepository.com/artifact/com.aliyun/aliyun-java-sdk-ram/3.1.0
Apache-2.0
https://mvnrepository.com/artifact/com.baomidou/mybatis-plus/3.5.2
Apache-2.0
https://mvnrepository.com/artifact/com.baomidou/mybatis-plus-annotation/3.5.2
Apache-2.0
https://mvnrepository.com/artifact/com.baomidou/mybatis-plus-boot-starter/3.5.2
Apache-2.0
diff --git a/pom.xml b/pom.xml
index 75a1f1f29..e3bfb67c5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -78,7 +78,6 @@
<modules>
<module>streampark-common</module>
<module>streampark-flink</module>
- <module>streampark-storage</module>
<module>streampark-console</module>
</modules>
@@ -117,7 +116,7 @@
<snakeyaml.version>2.0</snakeyaml.version>
<typesafe-conf.version>1.4.2</typesafe-conf.version>
<json4s-jackson.version>4.0.6</json4s-jackson.version>
- <commons-cli.version>1.3.1</commons-cli.version>
+ <commons-cli.version>1.5.0</commons-cli.version>
<commons-net.version>3.9.0</commons-net.version>
<commons-lang3.version>3.8.1</commons-lang3.version>
<enumeratum.version>1.6.1</enumeratum.version>
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
index e85b2fbcf..bda675186 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
@@ -55,6 +55,18 @@ object K8sFlinkConfig {
description = "retained tracking time for SILENT state flink tasks"
)
+ /**
+ * If an ingress controller is specified in the configuration, the ingress
class
+ * kubernetes.io/ingress.class must be specified when creating the ingress,
since there are often
+ * multiple ingress controllers in a production environment.
+ */
+ val ingressClass: InternalOption = InternalOption(
+ key = "streampark.flink-k8s.ingress.class",
+ defaultValue = "streampark",
+ classType = classOf[java.lang.String],
+ description = "Direct ingress to the ingress controller."
+ )
+
/** kubernetes default namespace */
val DEFAULT_KUBERNETES_NAMESPACE = "default"
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
index d32df01fd..965f15c2c 100755
---
a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
+++
b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh
@@ -270,8 +270,8 @@ print_logo() {
printf ' %s ___/ / /_/ / / __/ /_/ / / / / / / /_/ / /_/ / / / ,<
%s\n' $PRIMARY $RESET
printf ' %s /____/\__/_/ \___/\__,_/_/ /_/ /_/ ____/\__,_/_/ /_/|_|
%s\n' $PRIMARY $RESET
printf ' %s /_/
%s\n\n' $PRIMARY $RESET
- printf ' %s Version: 2.1.2 %s\n'
$BLUE $RESET
- printf ' %s WebSite: https://streampark.apache.org%s\n'
$BLUE $RESET
+ printf ' %s Version: 2.1.2 %s\n'
$BLUE $RESET
+ printf ' %s WebSite: https://streampark.apache.org%s\n'
$BLUE $RESET
printf ' %s GitHub : http://github.com/apache/streampark%s\n\n'
$BLUE $RESET
printf ' %s ──────── Apache StreamPark, Make stream processing easier
ô~ô!%s\n\n' $PRIMARY $RESET
}
@@ -622,11 +622,12 @@ main() {
echo_r "Unknown command: $1"
echo_w "Usage: streampark.sh ( commands ... )"
echo_w "commands:"
- echo_w " start \$conf Start StreamPark with application
config."
+ echo_w " start \$conf Start StreamPark with application
config."
echo_w " stop Stop StreamPark, wait up to 3
seconds and then use kill -KILL if still running"
+ echo_w " start_docker start in docker or k8s mode"
echo_w " status StreamPark status"
echo_w " debug StreamPark start with debug
mode,start debug mode, like: bash streampark.sh debug 10002"
- echo_w " restart \$conf restart StreamPark with
application config."
+ echo_w " restart \$conf restart StreamPark with
application config."
exit 0
;;
esac
diff --git
a/streampark-console/streampark-console-service/src/main/resources/application.yml
b/streampark-console/streampark-console-service/src/main/resources/application.yml
index 5b832f547..6f00aa0da 100644
---
a/streampark-console/streampark-console-service/src/main/resources/application.yml
+++
b/streampark-console/streampark-console-service/src/main/resources/application.yml
@@ -118,6 +118,9 @@ streampark:
polling-interval-sec:
job-status: 2
cluster-metric: 3
+ # If you need to specify an ingress controller, you can use this.
+ ingress:
+ class: nginx
# packer garbage resources collection configuration
packer-gc:
diff --git
a/streampark-console/streampark-console-webapp/src/components/Form/src/hooks/useFormEvents.ts
b/streampark-console/streampark-console-webapp/src/components/Form/src/hooks/useFormEvents.ts
index 6409ce5c2..32bb12095 100644
---
a/streampark-console/streampark-console-webapp/src/components/Form/src/hooks/useFormEvents.ts
+++
b/streampark-console/streampark-console-webapp/src/components/Form/src/hooks/useFormEvents.ts
@@ -109,7 +109,7 @@ export function useFormEvents({
} else {
nestKeyArray.forEach((nestKey: string) => {
try {
- const value = eval('values' + delimiter + nestKey);
+ const value = nestKey.split('.').reduce((out, item) => out[item],
values);
if (isDef(value)) {
formModel[nestKey] = value;
validKeys.push(nestKey);
diff --git
a/streampark-console/streampark-console-webapp/src/layouts/default/header/components/notify/index.vue
b/streampark-console/streampark-console-webapp/src/layouts/default/header/components/notify/index.vue
index b2bea1b87..534c73b31 100644
---
a/streampark-console/streampark-console-webapp/src/layouts/default/header/components/notify/index.vue
+++
b/streampark-console/streampark-console-webapp/src/layouts/default/header/components/notify/index.vue
@@ -137,8 +137,9 @@
listData.value[1].list = [...data];
}
}
+
const wbSocketUrl = `${window.location.origin}${
- import.meta.env.VITE_GLOB_API_URL
+ import.meta.env.VITE_GLOB_API_URL +
(import.meta.env.VITE_GLOB_API_URL_PREFIX || '')
}/websocket/${userStore.getUserInfo.userId}`;
const { data } = useWebSocket(wbSocketUrl.replace(/http/, 'ws'), {
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 8331517c8..43308ae86 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -147,6 +147,11 @@ trait FlinkClientTrait extends Logger {
}
})
}
+
+ setConfig(submitRequest, flinkConfig)
+
+ doSubmit(submitRequest, flinkConfig)
+
}
def setConfig(submitRequest: SubmitRequest, flinkConf: Configuration): Unit
@@ -304,7 +309,6 @@ trait FlinkClientTrait extends Logger {
submitRequest.appOption
.filter(
x => {
- // 验证参数是否合法...
val verify = commandLineOptions.hasOption(x._1)
if (!verify) logWarn(s"param:${x._1} is error,skip it.")
verify
@@ -353,8 +357,6 @@ trait FlinkClientTrait extends Logger {
logger.info(s"cliArgs: ${cliArgs.mkString(" ")}")
- FlinkRunOption.parse(commandLineOptions, cliArgs, true)
-
val commandLine = FlinkRunOption.parse(commandLineOptions, cliArgs, true)
val activeCommandLine = validateAndGetActiveCommandLine(
@@ -401,86 +403,78 @@ trait FlinkClientTrait extends Logger {
}
private[this] def extractProgramArgs(submitRequest: SubmitRequest):
JavaList[String] = {
-
val programArgs = new ArrayBuffer[String]()
+ val args = submitRequest.args
- if (StringUtils.isNotEmpty(submitRequest.args)) {
- val multiLineChar = "\"\"\""
- val array = submitRequest.args.split("\\s+")
- if (!array.exists(_.startsWith(multiLineChar))) {
+ if (StringUtils.isNotEmpty(args)) {
+ val multiChar = "\""
+ val array = args.split("\\s+")
+ if (!array.exists(_.startsWith(multiChar))) {
array.foreach(programArgs +=)
} else {
val argsArray = new ArrayBuffer[String]()
val tempBuffer = new ArrayBuffer[String]()
- @tailrec def processElement(index: Int, multiLine: Boolean): Unit = {
+ @tailrec
+ def processElement(index: Int, multi: Boolean): Unit = {
+
if (index == array.length) {
if (tempBuffer.nonEmpty) {
argsArray += tempBuffer.mkString(" ")
}
return
}
+
val next = index + 1
- val elem = array(index)
+ val elem = array(index).trim
- if (elem.trim.nonEmpty) {
- if (!multiLine) {
- if (elem.startsWith(multiLineChar)) {
- tempBuffer += elem.drop(3)
- processElement(next, multiLine = true)
- } else {
- argsArray += elem
- processElement(next, multiLine = false)
- }
- } else {
- if (elem.endsWith(multiLineChar)) {
- tempBuffer += elem.dropRight(3)
+ if (elem.isEmpty) {
+ processElement(next, multi = false)
+ } else {
+ if (multi) {
+ if (elem.endsWith(multiChar)) {
+ tempBuffer += elem.dropRight(1)
argsArray += tempBuffer.mkString(" ")
tempBuffer.clear()
- processElement(next, multiLine = false)
+ processElement(next, multi = false)
} else {
tempBuffer += elem
- processElement(next, multiLine)
+ processElement(next, multi)
+ }
+ } else {
+ val until = if (elem.endsWith(multiChar)) 1 else 0
+ if (elem.startsWith(multiChar)) {
+ tempBuffer += elem.drop(1).dropRight(until)
+ processElement(next, multi = true)
+ } else {
+ argsArray += elem.dropRight(until)
+ processElement(next, multi = false)
}
}
- } else {
- tempBuffer += elem
- processElement(next, multiLine = false)
}
}
- processElement(0, multiLine = false)
- argsArray.foreach(x => programArgs += x.trim)
+ processElement(0, multi = false)
+ argsArray.foreach(x => programArgs += x)
}
}
if (submitRequest.applicationType == ApplicationType.STREAMPARK_FLINK) {
- programArgs += PARAM_KEY_FLINK_CONF
- programArgs += submitRequest.flinkYaml
- programArgs += PARAM_KEY_APP_NAME
- programArgs += DeflaterUtils.zipString(submitRequest.effectiveAppName)
- programArgs += PARAM_KEY_FLINK_PARALLELISM
- programArgs += getParallelism(submitRequest).toString
+
+ programArgs += PARAM_KEY_FLINK_CONF += submitRequest.flinkYaml
+ programArgs += PARAM_KEY_APP_NAME +=
DeflaterUtils.zipString(submitRequest.effectiveAppName)
+ programArgs += PARAM_KEY_FLINK_PARALLELISM +=
getParallelism(submitRequest).toString
+
submitRequest.developmentMode match {
case DevelopmentMode.FLINK_SQL =>
- programArgs += PARAM_KEY_FLINK_SQL
- programArgs += submitRequest.flinkSQL
+ programArgs += PARAM_KEY_FLINK_SQL += submitRequest.flinkSQL
if (submitRequest.appConf != null) {
- programArgs += PARAM_KEY_APP_CONF
- programArgs += submitRequest.appConf
+ programArgs += PARAM_KEY_APP_CONF += submitRequest.appConf
}
case _ if
Try(!submitRequest.appConf.startsWith("json:")).getOrElse(true) =>
- programArgs += PARAM_KEY_APP_CONF
- programArgs += submitRequest.appConf
+ programArgs += PARAM_KEY_APP_CONF += submitRequest.appConf
}
- }
- // execution.runtime-mode
- if (submitRequest.properties.nonEmpty) {
- if
(submitRequest.properties.containsKey(ExecutionOptions.RUNTIME_MODE.key())) {
- programArgs += s"--${ExecutionOptions.RUNTIME_MODE.key()}"
- programArgs +=
submitRequest.properties.get(ExecutionOptions.RUNTIME_MODE.key()).toString
- }
}
programArgs.toList.asJava
}
@@ -491,8 +485,6 @@ trait FlinkClientTrait extends Logger {
commandLine: CommandLine): Configuration = {
require(activeCustomCommandLine != null, "activeCustomCommandLine must not
be null.")
- val executorConfig = activeCustomCommandLine.toConfiguration(commandLine)
- val customConfiguration = new Configuration(executorConfig)
val configuration = new Configuration()
val flinkDefaultConfiguration = getFlinkDefaultConfiguration(flinkHome)
flinkDefaultConfiguration.keySet.foreach(
@@ -502,7 +494,7 @@ trait FlinkClientTrait extends Logger {
case _ =>
}
})
- configuration.addAll(customConfiguration)
+ configuration.addAll(activeCustomCommandLine.toConfiguration(commandLine))
configuration
}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
index 59c82f471..5495814f2 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
@@ -58,7 +58,13 @@ object KubernetesDeploymentHelper extends Logger {
def getDeploymentStatusChanges(nameSpace: String, deploymentName: String):
Boolean = {
Try {
val pods = getPods(nameSpace, deploymentName)
- pods.head.getStatus.getContainerStatuses.head.getLastState.getTerminated
!= null
+ val podStatus = pods.head.getStatus
+ podStatus.getPhase match {
+ case "Unknown" => true
+ case "Failed" => true
+ case "Pending" => false
+ case _ =>
podStatus.getContainerStatuses.head.getLastState.getTerminated != null
+ }
}.getOrElse(true)
}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
index e03d3ae5a..d07e2aff8 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
@@ -17,6 +17,8 @@
package org.apache.streampark.flink.kubernetes.ingress
+import org.apache.streampark.common.conf.{ConfigConst, InternalConfigHolder,
K8sFlinkConfig}
+
import io.fabric8.kubernetes.api.model.{OwnerReference, OwnerReferenceBuilder}
import io.fabric8.kubernetes.client.DefaultKubernetesClient
import org.apache.commons.io.FileUtils
@@ -46,11 +48,16 @@ trait IngressStrategy {
}
def buildIngressAnnotations(clusterId: String): Map[String, String] = {
- Map(
+ val annotations = Map(
"nginx.ingress.kubernetes.io/rewrite-target" -> "/$2",
"nginx.ingress.kubernetes.io/proxy-body-size" -> "1024m",
"nginx.ingress.kubernetes.io/configuration-snippet" -> ("rewrite ^(/" +
clusterId + ")$ $1/ permanent;")
)
+ val ingressClass =
InternalConfigHolder.get[String](K8sFlinkConfig.ingressClass)
+ if (ingressClass.nonEmpty) {
+ annotations + ("kubernetes.io/ingress.class" -> ingressClass)
+ }
+ annotations
}
def buildIngressLabels(clusterId: String): Map[String, String] = {
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
index 8fb5711e6..c5c2fdd0b 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala
@@ -40,7 +40,7 @@ class IngressStrategyV1 extends IngressStrategy {
Option(client.network.v1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
.map(ingress => ingress.getSpec.getRules.get(0))
.map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
- .map { case (host, path) => s"https://$host$path" }
+ .map { case (host, path) => s"http://$host$path" }
.getOrElse(clusterClient.getWebInterfaceURL)
}.recover {
case e =>
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
index 97654796c..8cbc276e4 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala
@@ -41,7 +41,7 @@ class IngressStrategyV1beta1 extends IngressStrategy {
Option(client.network.v1beta1.ingresses.inNamespace(nameSpace).withName(clusterId).get)
.map(ingress => ingress.getSpec.getRules.get(0))
.map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath)
- .map { case (host, path) => s"https://$host$path" }
+ .map { case (host, path) => s"http://$host$path" }
.getOrElse(clusterClient.getWebInterfaceURL)
}.recover {
case e =>
diff --git a/streampark-storage/pom.xml b/streampark-storage/pom.xml
deleted file mode 100644
index a427da312..000000000
--- a/streampark-storage/pom.xml
+++ /dev/null
@@ -1,81 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.streampark</groupId>
- <artifactId>streampark</artifactId>
- <version>2.1.2</version>
- </parent>
-
- <artifactId>streampark-storage</artifactId>
- <name>StreamPark : Storage Service</name>
-
- <properties>
- <maven.compiler.source>8</maven.compiler.source>
- <maven.compiler.target>8</maven.compiler.target>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <aliyun.oss.version>3.15.0</aliyun.oss.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.aliyun.oss</groupId>
- <artifactId>aliyun-sdk-oss</artifactId>
- <version>${aliyun.oss.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.jdom</groupId>
- <artifactId>jdom2</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </dependency>
-
- <!--test-->
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter-engine</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-deploy-plugin</artifactId>
- <configuration>
- <skip>true</skip>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
-</project>
diff --git
a/streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java
b/streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java
deleted file mode 100644
index e0ff3201c..000000000
---
a/streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.storage;
-
-/**
- * StorageService will be used as artifacts fetcher in pod template, so it
should rely on other
- * modules.
- */
-public interface StorageService {
- void getData(String objectPath, String localFilePath) throws Exception;
-
- void putData(String objectPath, String localFilePath) throws Exception;
-}
diff --git
a/streampark-storage/src/main/java/org/apache/streampark/storage/StorageServiceConfig.java
b/streampark-storage/src/main/java/org/apache/streampark/storage/StorageServiceConfig.java
deleted file mode 100644
index 8e17e33d1..000000000
---
a/streampark-storage/src/main/java/org/apache/streampark/storage/StorageServiceConfig.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.storage;
-
-public abstract class StorageServiceConfig {}
diff --git
a/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssConfig.java
b/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssConfig.java
deleted file mode 100644
index 986e98f73..000000000
---
a/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssConfig.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.storage.oss;
-
-import org.apache.streampark.storage.StorageServiceConfig;
-
-import lombok.Data;
-
-@Data
-public class OssConfig extends StorageServiceConfig {
- private String accessKeyId;
- private String accessKeySecret;
- private String endpoint;
- private String bucket;
- private String baseUri;
-}
diff --git
a/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssStorageService.java
b/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssStorageService.java
deleted file mode 100644
index 98cffa270..000000000
---
a/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssStorageService.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.storage.oss;
-
-import org.apache.streampark.storage.StorageService;
-
-import com.aliyun.oss.ClientException;
-import com.aliyun.oss.OSS;
-import com.aliyun.oss.OSSClientBuilder;
-import com.aliyun.oss.OSSException;
-import com.aliyun.oss.model.GetObjectRequest;
-import com.aliyun.oss.model.PutObjectRequest;
-import com.google.common.annotations.VisibleForTesting;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.File;
-
-@Slf4j
-public class OssStorageService implements StorageService {
-
- final OssConfig ossConfig;
- final OSS ossClient;
-
- public OssStorageService(OssConfig config) {
- this.ossConfig = config;
- this.ossClient =
- new OSSClientBuilder()
- .build(
- ossConfig.getEndpoint(),
- ossConfig.getAccessKeyId(),
- ossConfig.getAccessKeySecret());
- }
-
- @Override
- public void getData(String objectPath, String localFilePath) throws
Exception {
- String bucket = ossConfig.getBucket();
-
- if (!ossClient.doesObjectExist(bucket, objectPath)) {
- throw new RuntimeException(String.format("File '%s' not exist",
objectPath));
- }
-
- try {
- ossClient.getObject(new GetObjectRequest(bucket, objectPath), new
File(localFilePath));
- } catch (Exception e) {
- log.error("GetData failed. ObjectPath: {}, local path: {}.", objectPath,
localFilePath, e);
- throw handleOssException(e);
- }
- }
-
- @Override
- public void putData(String objectPath, String localFilePath) throws
Exception {
- try {
- PutObjectRequest putObjectRequest =
- new PutObjectRequest(ossConfig.getBucket(), objectPath, new
File(localFilePath));
- ossClient.putObject(putObjectRequest);
- } catch (Exception e) {
- log.error("PutData failed. ObjectPath: {}, local path: {}.", objectPath,
localFilePath, e);
- throw handleOssException(e);
- }
- }
-
- @VisibleForTesting
- static RuntimeException handleOssException(Exception e) {
- if (e instanceof OSSException) {
- OSSException oe = (OSSException) e;
- String errMsg =
- String.format(
- "Caught an OSSException. Error Message: %s." + " Error Code: %s.
Request ID: %s",
- oe.getErrorMessage(), oe.getErrorCode(), oe.getRequestId());
- return new RuntimeException(errMsg, oe);
- } else if (e instanceof ClientException) {
- ClientException ce = (ClientException) e;
- String errMsg =
- String.format("Caught an ClientException. Error Message: %s.",
ce.getMessage());
- return new RuntimeException(errMsg, ce);
- } else {
- return new RuntimeException(e);
- }
- }
-}
diff --git
a/streampark-storage/src/test/java/org/apache/streampark/storage/oss/OssStorageServiceTest.java
b/streampark-storage/src/test/java/org/apache/streampark/storage/oss/OssStorageServiceTest.java
deleted file mode 100644
index 48ab4d939..000000000
---
a/streampark-storage/src/test/java/org/apache/streampark/storage/oss/OssStorageServiceTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.storage.oss;
-
-import com.aliyun.oss.ClientException;
-import com.aliyun.oss.OSSException;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-
-class OssStorageServiceTest {
-
- @Test
- void testHandleException() {
- OSSException ossException =
- new OSSException(
- "mock error", "MOCK_CODE", "requestId", "hostId", "header",
"resource", "GET");
- RuntimeException exp = OssStorageService.handleOssException(ossException);
- Assertions.assertEquals(
- "Caught an OSSException. Error Message: mock error. Error Code:
MOCK_CODE. Request ID: requestId",
- exp.getMessage());
-
- ClientException ossClientException = new ClientException("Client ERROR");
- exp = OssStorageService.handleOssException(ossClientException);
- Assertions.assertTrue(
- exp.getMessage().startsWith("Caught an ClientException. Error Message:
Client ERROR"));
- }
-}