This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch improve in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 7b94b923ff29ee511043b1691197c9077fefc5a5 Author: benjobs <[email protected]> AuthorDate: Mon Sep 4 23:47:35 2023 +0800 [Improve] Merge from dev --- deploy/helm/README.md | 36 ++++++ deploy/helm/streampark/templates/ingress.yaml | 24 ++-- deploy/helm/streampark/templates/streampark.yml | 18 +-- deploy/helm/streampark/values.yaml | 59 ++++++++-- dist-material/release-docs/LICENSE | 4 - pom.xml | 3 +- .../streampark/common/conf/K8sFlinkConfig.scala | 12 ++ .../src/main/assembly/bin/streampark.sh | 9 +- .../src/main/resources/application.yml | 3 + .../streampark-console-webapp/package.json | 131 ++++++++++----------- .../src/components/Form/src/hooks/useFormEvents.ts | 2 +- .../default/header/components/notify/index.vue | 3 +- .../flink/client/trait/FlinkClientTrait.scala | 94 +++++++-------- .../helper/KubernetesDeploymentHelper.scala | 8 +- .../flink/kubernetes/ingress/IngressStrategy.scala | 9 +- .../kubernetes/ingress/IngressStrategyV1.scala | 2 +- .../ingress/IngressStrategyV1beta1.scala | 2 +- streampark-storage/pom.xml | 81 ------------- .../apache/streampark/storage/StorageService.java | 28 ----- .../streampark/storage/StorageServiceConfig.java | 20 ---- .../apache/streampark/storage/oss/OssConfig.java | 31 ----- .../streampark/storage/oss/OssStorageService.java | 95 --------------- .../storage/oss/OssStorageServiceTest.java | 42 ------- 23 files changed, 246 insertions(+), 470 deletions(-) diff --git a/deploy/helm/README.md b/deploy/helm/README.md new file mode 100644 index 000000000..f43b63346 --- /dev/null +++ b/deploy/helm/README.md @@ -0,0 +1,36 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ + --> +# Deploy StreamPark on k8s + +### 1. create template + +```shell +helm template streampark/ -n default -f streampark/values.yaml --output-dir ./result +``` + +### 2. apply + +```shell +kubectl apply -f result/streampark/templates +``` + +### 3. open WebUI + +http://${host}:10000 + +#### [more detail](streampark/templates/NOTES.txt) diff --git a/deploy/helm/streampark/templates/ingress.yaml b/deploy/helm/streampark/templates/ingress.yaml index 5c9ca778c..4e7be4353 100644 --- a/deploy/helm/streampark/templates/ingress.yaml +++ b/deploy/helm/streampark/templates/ingress.yaml @@ -14,24 +14,28 @@ # See the License for the specific language governing permissions and # limitations under the License. # -{{- if .Values.ingress.enabled }} -apiVersion: networking.k8s.io/v1beta1 + {{- if .Values.ingress.enabled }} +apiVersion: networking.k8s.io/v1 kind: Ingress metadata: name: {{ include "streampark.name" . }} + namespace: {{ .Release.Namespace }} labels: {{- include "streampark.labels" . | nindent 4 }} annotations: - nginx.ingress.kubernetes.io/configuration-snippet: {{ .Values.nginx.ingress.kubernetes.configurationSnippet }} - nginx.ingress.kubernetes.io/proxy-body-size: {{ .Values.nginx.ingress.kubernetes.proxyBbodySize }} - nginx.ingress.kubernetes.io/rewrite-target: {{ .Values.nginx.ingress.kubernetes.rewriteTarget }} + {{- with .Values.ingress.annotations }} + {{- toYaml . | nindent 4 }} + {{- end }} spec: rules: - host: {{ .Values.ingress.host }} http: paths: - - backend: - serviceName: {{ .Values.service.name }} - servicePort: {{ .Values.spec.name }} - path: {{ .Values.ingress.path }} -{{- end }} + - backend: + service: + name: {{ .Values.service.name }} + port: + name: {{ .Values.spec.name }} + path: {{ .Values.ingress.path }} + pathType: {{ .Values.ingress.pathType }} + {{- end }} diff --git a/deploy/helm/streampark/templates/streampark.yml b/deploy/helm/streampark/templates/streampark.yml index 5d7bcb916..323e54773 100755 --- a/deploy/helm/streampark/templates/streampark.yml +++ b/deploy/helm/streampark/templates/streampark.yml @@ -22,7 +22,7 @@ metadata: labels: {{- include "streampark.labels" . | nindent 4 }} spec: - replicas: {{ .Values.replicaCount}} + replicas: {{ .Values.spec.replicaCount }} selector: matchLabels: {{- include "streampark.selectorLabels" . | nindent 6 }} @@ -57,11 +57,10 @@ spec: containerPort: {{ .Values.spec.containerPort }} protocol: TCP env: - - name: TZ - value: {{ .Values.timezone }} + {{- toYaml .Values.spec.container.env | nindent 12 }} securityContext: privileged: false - command: ["bash","-c","bash bin/startup.sh"] + command: ["bash","-c","bash ./bin/streampark.sh start_docker"] {{- if .Values.spec.livenessProbe.enabled }} livenessProbe: exec: @@ -83,11 +82,10 @@ spec: failureThreshold: {{ .Values.spec.readinessProbe.failureThreshold }} {{- end }} volumeMounts: - - name: volume-docker - mountPath: /var/run/docker.sock - readOnly: true - name: streampark-default-config-volume - mountPath: /streampark/conf + mountPath: /usr/local/service/streampark/conf + resources: + {{- toYaml .Values.spec.resources | nindent 12 }} volumes: - name: streampark-default-config-volume configMap: @@ -109,9 +107,5 @@ spec: path: spy.properties - key: ValidationMessages.properties path: ValidationMessages.properties - - name: volume-docker - hostPath: - path: /var/run/docker.sock - type: "" diff --git a/deploy/helm/streampark/values.yaml b/deploy/helm/streampark/values.yaml index a7116445b..1e156483f 100644 --- a/deploy/helm/streampark/values.yaml +++ b/deploy/helm/streampark/values.yaml @@ -18,8 +18,6 @@ # When enabled RBAC is only created for said namespaces, otherwise it is done for the cluster scope. # watchNamespaces: ["streampark"] -timezone: "Asia/Shanghai" - image: repository: "apache/streampark" pullPolicy: "IfNotPresent" @@ -30,6 +28,30 @@ rbac: create: true spec: + container: + env: [ + { + name: TZ, + value: "Asia/Shanghai" + }, + { + name: DOCKER_HOST, + value: "tcp://localhost:2375" + }, + { + name: LANG, + value: en_US.UTF-8 + }, + { + name: LANGUAGE, + value: en_US:en + }, + { + name: LC_ALL, + value: en_US.UTF-8 + } + ] + replicaCount: 1 containerPort: 10000 name: rest @@ -42,7 +64,17 @@ spec: tolerations: [ ] ## Affinity is a group of affinity scheduling rules. If specified, the pod's scheduling constraints. ## More info: https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.17/#affinity-v1-core - resources: { } + resources: { + limits: { + memory: "1Gi", + cpu: "1" + }, + requests: { + memory: "1Gi", + cpu: "1" + } + } + # resources: # limits: # memory: "2Gi" @@ -72,10 +104,18 @@ spec: successThreshold: "1" ingress: - enabled: false + enabled: true host: "streampark.apache.org" - path: "/streampark" - annotations: {} + path: "/streampark(/|$)(.*)" + pathType: "ImplementationSpecific" + annotations: { + nginx.ingress.kubernetes.io/rewrite-target: "/$2", + nginx.ingress.kubernetes.io/proxy-body-size: "1024m", + ## fix swagger 404: https://github.com/springdoc/springdoc-openapi/issues/1741 + ## add rewrite ^/v3/(.*)$ /streampark/v3/$1 redirect; + nginx.ingress.kubernetes.io/configuration-snippet: 'rewrite ^(/streampark)$ $1/ permanent;', + kubernetes.io/ingress.class: "nginx" + } service: ## type determines how the Service is exposed. Defaults to ClusterIP. Valid options are ExternalName, ClusterIP, NodePort, and LoadBalancer @@ -95,10 +135,3 @@ streamParkServiceAccount: create: true annotations: {} name: "streampark" - -nginx: - ingress: - kubernetes: - configurationSnippet: "rewrite ^(/streampark)$ $1/ permanent;" - proxyBbodySize: "1024m" - rewriteTarget: "/$2" diff --git a/dist-material/release-docs/LICENSE b/dist-material/release-docs/LICENSE index 076604b61..7a903c3c2 100644 --- a/dist-material/release-docs/LICENSE +++ b/dist-material/release-docs/LICENSE @@ -190,10 +190,6 @@ Apache-2.0 licenses The following components are provided under the Apache-2.0 License. See project link for details. The text of each license is the standard Apache 2.0 license. https://www.apache.org/licenses/LICENSE-2.0.txt - https://mvnrepository.com/artifact/com.aliyun.oss/aliyun-sdk-oss/3.15.0 Apache-2.0 - https://mvnrepository.com/artifact/com.aliyun/aliyun-java-sdk-core/4.5.10 Apache-2.0 - https://mvnrepository.com/artifact/com.aliyun/aliyun-java-sdk-kms/2.11.0 Apache-2.0 - https://mvnrepository.com/artifact/com.aliyun/aliyun-java-sdk-ram/3.1.0 Apache-2.0 https://mvnrepository.com/artifact/com.baomidou/mybatis-plus/3.5.2 Apache-2.0 https://mvnrepository.com/artifact/com.baomidou/mybatis-plus-annotation/3.5.2 Apache-2.0 https://mvnrepository.com/artifact/com.baomidou/mybatis-plus-boot-starter/3.5.2 Apache-2.0 diff --git a/pom.xml b/pom.xml index 75a1f1f29..e3bfb67c5 100644 --- a/pom.xml +++ b/pom.xml @@ -78,7 +78,6 @@ <modules> <module>streampark-common</module> <module>streampark-flink</module> - <module>streampark-storage</module> <module>streampark-console</module> </modules> @@ -117,7 +116,7 @@ <snakeyaml.version>2.0</snakeyaml.version> <typesafe-conf.version>1.4.2</typesafe-conf.version> <json4s-jackson.version>4.0.6</json4s-jackson.version> - <commons-cli.version>1.3.1</commons-cli.version> + <commons-cli.version>1.5.0</commons-cli.version> <commons-net.version>3.9.0</commons-net.version> <commons-lang3.version>3.8.1</commons-lang3.version> <enumeratum.version>1.6.1</enumeratum.version> diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala index e85b2fbcf..bda675186 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala @@ -55,6 +55,18 @@ object K8sFlinkConfig { description = "retained tracking time for SILENT state flink tasks" ) + /** + * If an ingress controller is specified in the configuration, the ingress class + * kubernetes.io/ingress.class must be specified when creating the ingress, since there are often + * multiple ingress controllers in a production environment. + */ + val ingressClass: InternalOption = InternalOption( + key = "streampark.flink-k8s.ingress.class", + defaultValue = "streampark", + classType = classOf[java.lang.String], + description = "Direct ingress to the ingress controller." + ) + /** kubernetes default namespace */ val DEFAULT_KUBERNETES_NAMESPACE = "default" diff --git a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh index d32df01fd..965f15c2c 100755 --- a/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh +++ b/streampark-console/streampark-console-service/src/main/assembly/bin/streampark.sh @@ -270,8 +270,8 @@ print_logo() { printf ' %s ___/ / /_/ / / __/ /_/ / / / / / / /_/ / /_/ / / / ,< %s\n' $PRIMARY $RESET printf ' %s /____/\__/_/ \___/\__,_/_/ /_/ /_/ ____/\__,_/_/ /_/|_| %s\n' $PRIMARY $RESET printf ' %s /_/ %s\n\n' $PRIMARY $RESET - printf ' %s Version: 2.1.2 %s\n' $BLUE $RESET - printf ' %s WebSite: https://streampark.apache.org%s\n' $BLUE $RESET + printf ' %s Version: 2.1.2 %s\n' $BLUE $RESET + printf ' %s WebSite: https://streampark.apache.org%s\n' $BLUE $RESET printf ' %s GitHub : http://github.com/apache/streampark%s\n\n' $BLUE $RESET printf ' %s ──────── Apache StreamPark, Make stream processing easier ô~ô!%s\n\n' $PRIMARY $RESET } @@ -622,11 +622,12 @@ main() { echo_r "Unknown command: $1" echo_w "Usage: streampark.sh ( commands ... )" echo_w "commands:" - echo_w " start \$conf Start StreamPark with application config." + echo_w " start \$conf Start StreamPark with application config." echo_w " stop Stop StreamPark, wait up to 3 seconds and then use kill -KILL if still running" + echo_w " start_docker start in docker or k8s mode" echo_w " status StreamPark status" echo_w " debug StreamPark start with debug mode,start debug mode, like: bash streampark.sh debug 10002" - echo_w " restart \$conf restart StreamPark with application config." + echo_w " restart \$conf restart StreamPark with application config." exit 0 ;; esac diff --git a/streampark-console/streampark-console-service/src/main/resources/application.yml b/streampark-console/streampark-console-service/src/main/resources/application.yml index 5b832f547..6f00aa0da 100644 --- a/streampark-console/streampark-console-service/src/main/resources/application.yml +++ b/streampark-console/streampark-console-service/src/main/resources/application.yml @@ -118,6 +118,9 @@ streampark: polling-interval-sec: job-status: 2 cluster-metric: 3 + # If you need to specify an ingress controller, you can use this. + ingress: + class: nginx # packer garbage resources collection configuration packer-gc: diff --git a/streampark-console/streampark-console-webapp/package.json b/streampark-console/streampark-console-webapp/package.json index 85eff3d54..e797c9fb1 100644 --- a/streampark-console/streampark-console-webapp/package.json +++ b/streampark-console/streampark-console-webapp/package.json @@ -5,7 +5,7 @@ "name": "streampark", "url": "https://streampark.apache.org" }, - "packageManager": "[email protected]", + "packageManager": "[email protected]", "repository": { "type": "git", "url": "git+https://github.com/apache/incubator-streampark.git" @@ -19,14 +19,14 @@ "bootstrap": "pnpm install", "serve": "npm run dev", "dev": "vite", - "build": "cross-env NODE_ENV=production vite build && esno ./build/script/postBuild.ts", - "build:test": "cross-env vite build --mode test && esno ./build/script/postBuild.ts", + "build": "vite build", + "build:analyze": "vite build --mode analyze", + "build:test": "cross-env vite build --mode test", "build:no-cache": "npm run clean:cache && npm run build", "report": "cross-env REPORT=true npm run build", "type:check": "vue-tsc --noEmit --skipLibCheck", "preview": "npm run build && vite preview", "preview:dist": "vite preview", - "log": "conventional-changelog -p angular -i CHANGELOG.md -s", "clean:cache": "rimraf node_modules/.cache/ && rimraf node_modules/.vite", "clean:lib": "rimraf node_modules", "lint:eslint": "eslint --cache --max-warnings 0 \"{src,mock}/**/*.{vue,ts,tsx}\" --fix", @@ -40,100 +40,89 @@ "gen:icon": "esno ./build/generate/icon/index.ts" }, "dependencies": { - "@ant-design/colors": "^6.0.0", + "@ant-design/colors": "^7.0.0", "@ant-design/icons-vue": "^6.1.0", - "@iconify/iconify": "^3.0.1", - "@vue/runtime-core": "^3.2.45", - "@vue/shared": "^3.2.45", - "@vueuse/core": "^9.6.0", - "@vueuse/shared": "^9.6.0", - "@zxcvbn-ts/core": "^2.1.0", - "ant-design-vue": "^3.2.15", - "axios": "^1.2.1", + "@iconify/iconify": "^3.1.1", + "@vue/runtime-core": "^3.3.4", + "@vue/shared": "^3.3.4", + "@vueuse/core": "^10.2.1", + "@vueuse/shared": "^10.2.1", + "ant-design-vue": "^3.2.20", + "axios": "^1.4.0", "crypto-js": "^4.1.1", - "dayjs": "^1.11.6", + "dayjs": "^1.11.9", "lodash-es": "^4.17.21", - "monaco-editor": "^0.34.1", + "monaco-editor": "^0.40.0", "nprogress": "^0.2.0", "path-to-regexp": "^6.2.1", - "penpal": "^6.2.2", - "pinia": "2.0.27", - "qs": "^6.11.0", + "pinia": "2.1.4", + "qs": "^6.11.2", "resize-observer-polyfill": "^1.5.1", - "sql-formatter": "^4.0.2", - "sweetalert2": "^11.4.18", "sortablejs": "^1.15.0", - "terser": "^5.16.1", - "vue": "^3.2.47", + "sql-formatter": "^4.0.2", + "sweetalert2": "^11.7.16", + "terser": "^5.19.0", + "vue": "^3.3.4", "vue-i18n": "^9.2.2", - "vue-router": "^4.1.6", - "vue-types": "^5.0.2" + "vue-router": "^4.2.4", + "vue-types": "^5.1.0" }, "devDependencies": { - "@commitlint/cli": "^17.3.0", - "@commitlint/config-conventional": "^17.3.0", - "@iconify/json": "^2.1.148", + "@iconify/json": "^2.2.89", "@purge-icons/generated": "^0.9.0", - "@types/fs-extra": "^9.0.13", - "@types/lodash-es": "^4.17.6", - "@types/node": "^18.11.11", + "@types/fs-extra": "^11.0.1", + "@types/lodash-es": "^4.17.7", + "@types/node": "^20.4.1", "@types/nprogress": "^0.2.0", "@types/qs": "^6.9.7", - "@types/showdown": "^2.0.0", - "@types/sortablejs": "^1.15.0", - "@typescript-eslint/eslint-plugin": "^5.45.1", - "@typescript-eslint/parser": "^5.45.1", - "@vitejs/plugin-legacy": "^2.2.0", - "@vitejs/plugin-vue": "^3.1.2", - "@vitejs/plugin-vue-jsx": "^2.0.1", - "@vue/compiler-sfc": "^3.2.47", - "autoprefixer": "^10.4.13", - "commitizen": "^4.2.5", + "@types/showdown": "^2.0.1", + "@types/sortablejs": "^1.15.1", + "@typescript-eslint/eslint-plugin": "^6.0.0", + "@typescript-eslint/parser": "^6.0.0", + "@vitejs/plugin-legacy": "^4.1.0", + "@vitejs/plugin-vue": "^4.2.3", + "@vitejs/plugin-vue-jsx": "^3.0.1", + "@vue/compiler-sfc": "^3.3.4", + "autoprefixer": "^10.4.14", "cross-env": "^7.0.3", - "dotenv": "^16.0.3", - "eslint": "^8.29.0", - "eslint-config-prettier": "^8.5.0", - "eslint-plugin-prettier": "^4.2.1", - "eslint-plugin-vue": "^9.8.0", - "esno": "^0.16.3", - "fs-extra": "^11.1.0", + "dotenv": "^16.3.1", + "eslint": "^8.44.0", + "eslint-config-prettier": "^8.8.0", + "eslint-plugin-prettier": "^5.0.0", + "eslint-plugin-vue": "^9.15.1", + "esno": "^0.17.0", + "fs-extra": "^11.1.1", "less": "^4.1.3", - "lint-staged": "13.1.0", + "lint-staged": "13.2.3", "picocolors": "^1.0.0", - "postcss": "^8.4.19", + "postcss": "^8.4.25", "postcss-html": "^1.5.0", "postcss-less": "^6.0.0", - "prettier": "^2.8.0", - "rimraf": "^3.0.2", - "rollup": "^3.20.1", - "rollup-plugin-visualizer": "^5.9.0", - "stylelint": "^14.16.0", - "stylelint-config-prettier": "^9.0.4", - "stylelint-config-recommended": "^9.0.0", - "stylelint-config-recommended-vue": "^1.4.0", - "stylelint-config-standard": "^29.0.0", - "stylelint-order": "^5.0.0", + "prettier": "^3.0.0", + "rimraf": "^5.0.1", + "rollup-plugin-visualizer": "^5.9.2", + "stylelint": "^15.10.1", + "stylelint-config-recommended": "^13.0.0", + "stylelint-config-recommended-vue": "^1.5.0", + "stylelint-config-standard": "^34.0.0", + "stylelint-order": "^6.0.3", "ts-node": "^10.9.1", - "typescript": "^5.0.2", - "vite": "^3.2.5", + "typescript": "^5.1.6", + "vite": "^4.4.9", "vite-plugin-compression": "^0.5.1", - "vite-plugin-html": "^3.2.0", + "vite-plugin-html-transform": "^1.0.0", "vite-plugin-imagemin": "^0.6.1", "vite-plugin-purge-icons": "^0.9.2", "vite-plugin-style-import": "^2.0.0", "vite-plugin-svg-icons": "^2.0.1", "vite-plugin-theme": "^0.8.6", - "vite-plugin-windicss": "^1.8.10", - "vue-eslint-parser": "^9.1.0", - "vue-tsc": "^1.2.0" - }, - "resolutions": { - "bin-wrapper": "npm:bin-wrapper-china", - "rollup": "^2.56.3", - "gifsicle": "5.2.0" + "vite-plugin-windicss": "^1.9.1", + "vue-eslint-parser": "^9.3.1", + "vue-tsc": "^1.8.4" }, "engines": { - "node": ">=16" + "node": ">=16.15.1", + "pnpm": ">=8.1.0" }, "lint-staged": { "*.{js,jsx,ts,tsx}": [ diff --git a/streampark-console/streampark-console-webapp/src/components/Form/src/hooks/useFormEvents.ts b/streampark-console/streampark-console-webapp/src/components/Form/src/hooks/useFormEvents.ts index 6409ce5c2..32bb12095 100644 --- a/streampark-console/streampark-console-webapp/src/components/Form/src/hooks/useFormEvents.ts +++ b/streampark-console/streampark-console-webapp/src/components/Form/src/hooks/useFormEvents.ts @@ -109,7 +109,7 @@ export function useFormEvents({ } else { nestKeyArray.forEach((nestKey: string) => { try { - const value = eval('values' + delimiter + nestKey); + const value = nestKey.split('.').reduce((out, item) => out[item], values); if (isDef(value)) { formModel[nestKey] = value; validKeys.push(nestKey); diff --git a/streampark-console/streampark-console-webapp/src/layouts/default/header/components/notify/index.vue b/streampark-console/streampark-console-webapp/src/layouts/default/header/components/notify/index.vue index b2bea1b87..534c73b31 100644 --- a/streampark-console/streampark-console-webapp/src/layouts/default/header/components/notify/index.vue +++ b/streampark-console/streampark-console-webapp/src/layouts/default/header/components/notify/index.vue @@ -137,8 +137,9 @@ listData.value[1].list = [...data]; } } + const wbSocketUrl = `${window.location.origin}${ - import.meta.env.VITE_GLOB_API_URL + import.meta.env.VITE_GLOB_API_URL + (import.meta.env.VITE_GLOB_API_URL_PREFIX || '') }/websocket/${userStore.getUserInfo.userId}`; const { data } = useWebSocket(wbSocketUrl.replace(/http/, 'ws'), { diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala index 8331517c8..43308ae86 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala @@ -147,6 +147,11 @@ trait FlinkClientTrait extends Logger { } }) } + + setConfig(submitRequest, flinkConfig) + + doSubmit(submitRequest, flinkConfig) + } def setConfig(submitRequest: SubmitRequest, flinkConf: Configuration): Unit @@ -304,7 +309,6 @@ trait FlinkClientTrait extends Logger { submitRequest.appOption .filter( x => { - // 验证参数是否合法... val verify = commandLineOptions.hasOption(x._1) if (!verify) logWarn(s"param:${x._1} is error,skip it.") verify @@ -353,8 +357,6 @@ trait FlinkClientTrait extends Logger { logger.info(s"cliArgs: ${cliArgs.mkString(" ")}") - FlinkRunOption.parse(commandLineOptions, cliArgs, true) - val commandLine = FlinkRunOption.parse(commandLineOptions, cliArgs, true) val activeCommandLine = validateAndGetActiveCommandLine( @@ -401,86 +403,78 @@ trait FlinkClientTrait extends Logger { } private[this] def extractProgramArgs(submitRequest: SubmitRequest): JavaList[String] = { - val programArgs = new ArrayBuffer[String]() + val args = submitRequest.args - if (StringUtils.isNotEmpty(submitRequest.args)) { - val multiLineChar = "\"\"\"" - val array = submitRequest.args.split("\\s+") - if (!array.exists(_.startsWith(multiLineChar))) { + if (StringUtils.isNotEmpty(args)) { + val multiChar = "\"" + val array = args.split("\\s+") + if (!array.exists(_.startsWith(multiChar))) { array.foreach(programArgs +=) } else { val argsArray = new ArrayBuffer[String]() val tempBuffer = new ArrayBuffer[String]() - @tailrec def processElement(index: Int, multiLine: Boolean): Unit = { + @tailrec + def processElement(index: Int, multi: Boolean): Unit = { + if (index == array.length) { if (tempBuffer.nonEmpty) { argsArray += tempBuffer.mkString(" ") } return } + val next = index + 1 - val elem = array(index) + val elem = array(index).trim - if (elem.trim.nonEmpty) { - if (!multiLine) { - if (elem.startsWith(multiLineChar)) { - tempBuffer += elem.drop(3) - processElement(next, multiLine = true) - } else { - argsArray += elem - processElement(next, multiLine = false) - } - } else { - if (elem.endsWith(multiLineChar)) { - tempBuffer += elem.dropRight(3) + if (elem.isEmpty) { + processElement(next, multi = false) + } else { + if (multi) { + if (elem.endsWith(multiChar)) { + tempBuffer += elem.dropRight(1) argsArray += tempBuffer.mkString(" ") tempBuffer.clear() - processElement(next, multiLine = false) + processElement(next, multi = false) } else { tempBuffer += elem - processElement(next, multiLine) + processElement(next, multi) + } + } else { + val until = if (elem.endsWith(multiChar)) 1 else 0 + if (elem.startsWith(multiChar)) { + tempBuffer += elem.drop(1).dropRight(until) + processElement(next, multi = true) + } else { + argsArray += elem.dropRight(until) + processElement(next, multi = false) } } - } else { - tempBuffer += elem - processElement(next, multiLine = false) } } - processElement(0, multiLine = false) - argsArray.foreach(x => programArgs += x.trim) + processElement(0, multi = false) + argsArray.foreach(x => programArgs += x) } } if (submitRequest.applicationType == ApplicationType.STREAMPARK_FLINK) { - programArgs += PARAM_KEY_FLINK_CONF - programArgs += submitRequest.flinkYaml - programArgs += PARAM_KEY_APP_NAME - programArgs += DeflaterUtils.zipString(submitRequest.effectiveAppName) - programArgs += PARAM_KEY_FLINK_PARALLELISM - programArgs += getParallelism(submitRequest).toString + + programArgs += PARAM_KEY_FLINK_CONF += submitRequest.flinkYaml + programArgs += PARAM_KEY_APP_NAME += DeflaterUtils.zipString(submitRequest.effectiveAppName) + programArgs += PARAM_KEY_FLINK_PARALLELISM += getParallelism(submitRequest).toString + submitRequest.developmentMode match { case DevelopmentMode.FLINK_SQL => - programArgs += PARAM_KEY_FLINK_SQL - programArgs += submitRequest.flinkSQL + programArgs += PARAM_KEY_FLINK_SQL += submitRequest.flinkSQL if (submitRequest.appConf != null) { - programArgs += PARAM_KEY_APP_CONF - programArgs += submitRequest.appConf + programArgs += PARAM_KEY_APP_CONF += submitRequest.appConf } case _ if Try(!submitRequest.appConf.startsWith("json:")).getOrElse(true) => - programArgs += PARAM_KEY_APP_CONF - programArgs += submitRequest.appConf + programArgs += PARAM_KEY_APP_CONF += submitRequest.appConf } - } - // execution.runtime-mode - if (submitRequest.properties.nonEmpty) { - if (submitRequest.properties.containsKey(ExecutionOptions.RUNTIME_MODE.key())) { - programArgs += s"--${ExecutionOptions.RUNTIME_MODE.key()}" - programArgs += submitRequest.properties.get(ExecutionOptions.RUNTIME_MODE.key()).toString - } } programArgs.toList.asJava } @@ -491,8 +485,6 @@ trait FlinkClientTrait extends Logger { commandLine: CommandLine): Configuration = { require(activeCustomCommandLine != null, "activeCustomCommandLine must not be null.") - val executorConfig = activeCustomCommandLine.toConfiguration(commandLine) - val customConfiguration = new Configuration(executorConfig) val configuration = new Configuration() val flinkDefaultConfiguration = getFlinkDefaultConfiguration(flinkHome) flinkDefaultConfiguration.keySet.foreach( @@ -502,7 +494,7 @@ trait FlinkClientTrait extends Logger { case _ => } }) - configuration.addAll(customConfiguration) + configuration.addAll(activeCustomCommandLine.toConfiguration(commandLine)) configuration } diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala index 59c82f471..5495814f2 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala @@ -58,7 +58,13 @@ object KubernetesDeploymentHelper extends Logger { def getDeploymentStatusChanges(nameSpace: String, deploymentName: String): Boolean = { Try { val pods = getPods(nameSpace, deploymentName) - pods.head.getStatus.getContainerStatuses.head.getLastState.getTerminated != null + val podStatus = pods.head.getStatus + podStatus.getPhase match { + case "Unknown" => true + case "Failed" => true + case "Pending" => false + case _ => podStatus.getContainerStatuses.head.getLastState.getTerminated != null + } }.getOrElse(true) } diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala index e03d3ae5a..d07e2aff8 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala @@ -17,6 +17,8 @@ package org.apache.streampark.flink.kubernetes.ingress +import org.apache.streampark.common.conf.{ConfigConst, InternalConfigHolder, K8sFlinkConfig} + import io.fabric8.kubernetes.api.model.{OwnerReference, OwnerReferenceBuilder} import io.fabric8.kubernetes.client.DefaultKubernetesClient import org.apache.commons.io.FileUtils @@ -46,11 +48,16 @@ trait IngressStrategy { } def buildIngressAnnotations(clusterId: String): Map[String, String] = { - Map( + val annotations = Map( "nginx.ingress.kubernetes.io/rewrite-target" -> "/$2", "nginx.ingress.kubernetes.io/proxy-body-size" -> "1024m", "nginx.ingress.kubernetes.io/configuration-snippet" -> ("rewrite ^(/" + clusterId + ")$ $1/ permanent;") ) + val ingressClass = InternalConfigHolder.get[String](K8sFlinkConfig.ingressClass) + if (ingressClass.nonEmpty) { + annotations + ("kubernetes.io/ingress.class" -> ingressClass) + } + annotations } def buildIngressLabels(clusterId: String): Map[String, String] = { diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala index 8fb5711e6..c5c2fdd0b 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1.scala @@ -40,7 +40,7 @@ class IngressStrategyV1 extends IngressStrategy { Option(client.network.v1.ingresses.inNamespace(nameSpace).withName(clusterId).get) .map(ingress => ingress.getSpec.getRules.get(0)) .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath) - .map { case (host, path) => s"https://$host$path" } + .map { case (host, path) => s"http://$host$path" } .getOrElse(clusterClient.getWebInterfaceURL) }.recover { case e => diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala index 97654796c..8cbc276e4 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategyV1beta1.scala @@ -41,7 +41,7 @@ class IngressStrategyV1beta1 extends IngressStrategy { Option(client.network.v1beta1.ingresses.inNamespace(nameSpace).withName(clusterId).get) .map(ingress => ingress.getSpec.getRules.get(0)) .map(rule => rule.getHost -> rule.getHttp.getPaths.get(0).getPath) - .map { case (host, path) => s"https://$host$path" } + .map { case (host, path) => s"http://$host$path" } .getOrElse(clusterClient.getWebInterfaceURL) }.recover { case e => diff --git a/streampark-storage/pom.xml b/streampark-storage/pom.xml deleted file mode 100644 index a427da312..000000000 --- a/streampark-storage/pom.xml +++ /dev/null @@ -1,81 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.streampark</groupId> - <artifactId>streampark</artifactId> - <version>2.1.2</version> - </parent> - - <artifactId>streampark-storage</artifactId> - <name>StreamPark : Storage Service</name> - - <properties> - <maven.compiler.source>8</maven.compiler.source> - <maven.compiler.target>8</maven.compiler.target> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <aliyun.oss.version>3.15.0</aliyun.oss.version> - </properties> - - <dependencies> - <dependency> - <groupId>org.projectlombok</groupId> - <artifactId>lombok</artifactId> - </dependency> - - <dependency> - <groupId>com.aliyun.oss</groupId> - <artifactId>aliyun-sdk-oss</artifactId> - <version>${aliyun.oss.version}</version> - <exclusions> - <exclusion> - <groupId>org.jdom</groupId> - <artifactId>jdom2</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - </dependency> - - <!--test--> - <dependency> - <groupId>org.junit.jupiter</groupId> - <artifactId>junit-jupiter-engine</artifactId> - <scope>test</scope> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-deploy-plugin</artifactId> - <configuration> - <skip>true</skip> - </configuration> - </plugin> - </plugins> - </build> - -</project> diff --git a/streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java b/streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java deleted file mode 100644 index e0ff3201c..000000000 --- a/streampark-storage/src/main/java/org/apache/streampark/storage/StorageService.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.streampark.storage; - -/** - * StorageService will be used as artifacts fetcher in pod template, so it should rely on other - * modules. - */ -public interface StorageService { - void getData(String objectPath, String localFilePath) throws Exception; - - void putData(String objectPath, String localFilePath) throws Exception; -} diff --git a/streampark-storage/src/main/java/org/apache/streampark/storage/StorageServiceConfig.java b/streampark-storage/src/main/java/org/apache/streampark/storage/StorageServiceConfig.java deleted file mode 100644 index 8e17e33d1..000000000 --- a/streampark-storage/src/main/java/org/apache/streampark/storage/StorageServiceConfig.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.streampark.storage; - -public abstract class StorageServiceConfig {} diff --git a/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssConfig.java b/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssConfig.java deleted file mode 100644 index 986e98f73..000000000 --- a/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssConfig.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.streampark.storage.oss; - -import org.apache.streampark.storage.StorageServiceConfig; - -import lombok.Data; - -@Data -public class OssConfig extends StorageServiceConfig { - private String accessKeyId; - private String accessKeySecret; - private String endpoint; - private String bucket; - private String baseUri; -} diff --git a/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssStorageService.java b/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssStorageService.java deleted file mode 100644 index 98cffa270..000000000 --- a/streampark-storage/src/main/java/org/apache/streampark/storage/oss/OssStorageService.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.streampark.storage.oss; - -import org.apache.streampark.storage.StorageService; - -import com.aliyun.oss.ClientException; -import com.aliyun.oss.OSS; -import com.aliyun.oss.OSSClientBuilder; -import com.aliyun.oss.OSSException; -import com.aliyun.oss.model.GetObjectRequest; -import com.aliyun.oss.model.PutObjectRequest; -import com.google.common.annotations.VisibleForTesting; -import lombok.extern.slf4j.Slf4j; - -import java.io.File; - -@Slf4j -public class OssStorageService implements StorageService { - - final OssConfig ossConfig; - final OSS ossClient; - - public OssStorageService(OssConfig config) { - this.ossConfig = config; - this.ossClient = - new OSSClientBuilder() - .build( - ossConfig.getEndpoint(), - ossConfig.getAccessKeyId(), - ossConfig.getAccessKeySecret()); - } - - @Override - public void getData(String objectPath, String localFilePath) throws Exception { - String bucket = ossConfig.getBucket(); - - if (!ossClient.doesObjectExist(bucket, objectPath)) { - throw new RuntimeException(String.format("File '%s' not exist", objectPath)); - } - - try { - ossClient.getObject(new GetObjectRequest(bucket, objectPath), new File(localFilePath)); - } catch (Exception e) { - log.error("GetData failed. ObjectPath: {}, local path: {}.", objectPath, localFilePath, e); - throw handleOssException(e); - } - } - - @Override - public void putData(String objectPath, String localFilePath) throws Exception { - try { - PutObjectRequest putObjectRequest = - new PutObjectRequest(ossConfig.getBucket(), objectPath, new File(localFilePath)); - ossClient.putObject(putObjectRequest); - } catch (Exception e) { - log.error("PutData failed. ObjectPath: {}, local path: {}.", objectPath, localFilePath, e); - throw handleOssException(e); - } - } - - @VisibleForTesting - static RuntimeException handleOssException(Exception e) { - if (e instanceof OSSException) { - OSSException oe = (OSSException) e; - String errMsg = - String.format( - "Caught an OSSException. Error Message: %s." + " Error Code: %s. Request ID: %s", - oe.getErrorMessage(), oe.getErrorCode(), oe.getRequestId()); - return new RuntimeException(errMsg, oe); - } else if (e instanceof ClientException) { - ClientException ce = (ClientException) e; - String errMsg = - String.format("Caught an ClientException. Error Message: %s.", ce.getMessage()); - return new RuntimeException(errMsg, ce); - } else { - return new RuntimeException(e); - } - } -} diff --git a/streampark-storage/src/test/java/org/apache/streampark/storage/oss/OssStorageServiceTest.java b/streampark-storage/src/test/java/org/apache/streampark/storage/oss/OssStorageServiceTest.java deleted file mode 100644 index 48ab4d939..000000000 --- a/streampark-storage/src/test/java/org/apache/streampark/storage/oss/OssStorageServiceTest.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.streampark.storage.oss; - -import com.aliyun.oss.ClientException; -import com.aliyun.oss.OSSException; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -class OssStorageServiceTest { - - @Test - void testHandleException() { - OSSException ossException = - new OSSException( - "mock error", "MOCK_CODE", "requestId", "hostId", "header", "resource", "GET"); - RuntimeException exp = OssStorageService.handleOssException(ossException); - Assertions.assertEquals( - "Caught an OSSException. Error Message: mock error. Error Code: MOCK_CODE. Request ID: requestId", - exp.getMessage()); - - ClientException ossClientException = new ClientException("Client ERROR"); - exp = OssStorageService.handleOssException(ossClientException); - Assertions.assertTrue( - exp.getMessage().startsWith("Caught an ClientException. Error Message: Client ERROR")); - } -}
