This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new d5ca0346b [Fix]: spark config error (#4070)
d5ca0346b is described below
commit d5ca0346be74931009272ba0719fc973fafd27c3
Author: Kriszu <[email protected]>
AuthorDate: Wed Sep 18 19:35:13 2024 +0800
[Fix]: spark config error (#4070)
* [Fix]: spark config error
* fix: copy app
---
.../src/api/model/baseModel.ts | 2 +-
.../src/api/resource/upload/index.ts | 2 +-
.../streampark-console-webapp/src/api/spark/app.ts | 2 +-
.../src/views/spark/app/create.vue | 14 ++---
.../src/views/spark/app/edit.vue | 58 +++++++++++++++-----
.../src/views/spark/app/hooks/useAppFormSchema.tsx | 40 +++++++++++---
.../src/views/spark/app/hooks/useSparkAction.tsx | 4 +-
.../src/views/spark/home/components/Modal.vue | 63 ++++++++++------------
8 files changed, 117 insertions(+), 68 deletions(-)
diff --git
a/streampark-console/streampark-console-webapp/src/api/model/baseModel.ts
b/streampark-console/streampark-console-webapp/src/api/model/baseModel.ts
index 50040e750..9e744bc49 100644
--- a/streampark-console/streampark-console-webapp/src/api/model/baseModel.ts
+++ b/streampark-console/streampark-console-webapp/src/api/model/baseModel.ts
@@ -27,6 +27,6 @@ export interface BasicFetchResult<T> {
export interface BasicTableParams {
page: number;
pageSize: number;
- teamId: string;
+ teamId?: string;
[key: string]: Nullable<string | number>;
}
diff --git
a/streampark-console/streampark-console-webapp/src/api/resource/upload/index.ts
b/streampark-console/streampark-console-webapp/src/api/resource/upload/index.ts
index 1ad2809f1..077278931 100644
---
a/streampark-console/streampark-console-webapp/src/api/resource/upload/index.ts
+++
b/streampark-console/streampark-console-webapp/src/api/resource/upload/index.ts
@@ -86,7 +86,7 @@ export function checkResource(data: ResourceParam):
Promise<AxiosResponse<Result
}
export function fetchUpload(params) {
- return defHttp.post<string>({
+ return defHttp.post({
url: RESOURCE_API.UPLOAD,
params,
headers: {
diff --git a/streampark-console/streampark-console-webapp/src/api/spark/app.ts
b/streampark-console/streampark-console-webapp/src/api/spark/app.ts
index f4f639eff..67e363b58 100644
--- a/streampark-console/streampark-console-webapp/src/api/spark/app.ts
+++ b/streampark-console/streampark-console-webapp/src/api/spark/app.ts
@@ -41,7 +41,7 @@ export function fetchCreateSparkApp(data: SparkApplication) {
* @param params get parameters
*/
export function fetchCopySparkApp(data: SparkApplication) {
- return defHttp.post({ url: `${apiPrefix}/copy`, data });
+ return defHttp.post({ url: `${apiPrefix}/copy`, data }, {
isTransformResponse: false });
}
/**
* update spark application information
diff --git
a/streampark-console/streampark-console-webapp/src/views/spark/app/create.vue
b/streampark-console/streampark-console-webapp/src/views/spark/app/create.vue
index 9030436ff..69304f8bb 100644
---
a/streampark-console/streampark-console-webapp/src/views/spark/app/create.vue
+++
b/streampark-console/streampark-console-webapp/src/views/spark/app/create.vue
@@ -79,12 +79,6 @@
}
/* spark sql mode */
async function handleSQLMode(values: Recordable) {
- let config = values.configOverride;
- if (config != null && config !== undefined && config.trim() != '') {
- config = encryptByBase64(config);
- } else {
- config = null;
- }
handleCreateAction({
jobType: JobTypeEnum.SQL,
executionMode: values.executionMode,
@@ -97,7 +91,7 @@
tags: values.tags,
yarnQueue: values.yarnQueue,
resourceFrom: ResourceFromEnum.UPLOAD,
- config,
+ config: values.config,
appProperties: values.appProperties,
appArgs: values.args,
hadoopUser: values.hadoopUser,
@@ -106,6 +100,12 @@
}
/* Submit to create */
async function handleAppSubmit(formValue: Recordable) {
+ let config = formValue.configOverride;
+ if (config != null && config !== undefined && config.trim() != '') {
+ formValue.config = encryptByBase64(config);
+ } else {
+ formValue.config = null;
+ }
if (formValue.jobType == JobTypeEnum.SQL) {
if (formValue.sparkSql == null || formValue.sparkSql.trim() === '') {
createMessage.warning(t('spark.app.addAppTips.sparkSqlIsRequiredMessage'));
diff --git
a/streampark-console/streampark-console-webapp/src/views/spark/app/edit.vue
b/streampark-console/streampark-console-webapp/src/views/spark/app/edit.vue
index baf967252..f42a457d5 100644
--- a/streampark-console/streampark-console-webapp/src/views/spark/app/edit.vue
+++ b/streampark-console/streampark-console-webapp/src/views/spark/app/edit.vue
@@ -17,14 +17,14 @@
<script setup lang="ts">
import { useGo } from '/@/hooks/web/usePage';
import AppForm from './components/AppForm.vue';
- import { onMounted, ref } from 'vue';
+ import { nextTick, onMounted, ref } from 'vue';
import { PageWrapper } from '/@/components/Page';
import { useMessage } from '/@/hooks/web/useMessage';
import { createLocalStorage } from '/@/utils/cache';
import { buildUUID } from '/@/utils/uuid';
import { useI18n } from '/@/hooks/web/useI18n';
- import { encryptByBase64 } from '/@/utils/cipher';
+ import { decodeByBase64, encryptByBase64 } from '/@/utils/cipher';
import { AppTypeEnum, JobTypeEnum, ResourceFromEnum } from
'/@/enums/flinkEnum';
import { fetchGetSparkApp, fetchUpdateSparkApp } from '/@/api/spark/app';
import { SparkApplication } from '/@/api/spark/app.type';
@@ -36,7 +36,9 @@
name: 'SparkApplicationAction',
});
const go = useGo();
- const sparkSql = ref();
+ const appFormRef = ref<{
+ sparkSql: any;
+ } | null>(null);
const { t } = useI18n();
const sparkApp = ref<SparkApplication>({});
@@ -48,8 +50,23 @@
async function handleAppFieldValue() {
const appId = route.query.appId;
+
const res = await fetchGetSparkApp({ id: appId as string });
+ let isSetConfig = false;
+ let configOverride = '';
+ if (res.config && res.config.trim() !== '') {
+ configOverride = decodeByBase64(res.config);
+ isSetConfig = true;
+ }
+ Object.assign(res, {
+ sparkSql: res.sparkSql ? decodeByBase64(res.sparkSql) : '',
+ isSetConfig,
+ configOverride,
+ });
sparkApp.value = res;
+ nextTick(() => {
+ if (res.sparkSql)
appFormRef.value?.sparkSql?.setContent(decodeByBase64(res.sparkSql));
+ });
return res;
}
@@ -77,12 +94,6 @@
}
/* spark sql mode */
async function handleSQLMode(values: Recordable) {
- let config = values.configOverride;
- if (config != null && config !== undefined && config.trim() != '') {
- config = encryptByBase64(config);
- } else {
- config = null;
- }
handleUpdateAction({
jobType: JobTypeEnum.SQL,
executionMode: values.executionMode,
@@ -95,7 +106,7 @@
tags: values.tags,
yarnQueue: values.yarnQueue,
resourceFrom: ResourceFromEnum.UPLOAD,
- config,
+ config: values.config,
appProperties: values.appProperties,
appArgs: values.args,
hadoopUser: values.hadoopUser,
@@ -104,19 +115,33 @@
}
/* Submit to create */
async function handleAppSubmit(formValue: Recordable) {
+ let config = formValue.configOverride;
+ console.log(config, formValue);
+ if (config != null && config !== undefined && config.trim() != '') {
+ config = encryptByBase64(config);
+ } else {
+ config = null;
+ }
+
if (formValue.jobType == JobTypeEnum.SQL) {
if (formValue.sparkSql == null || formValue.sparkSql.trim() === '') {
createMessage.warning(t('spark.app.addAppTips.sparkSqlIsRequiredMessage'));
} else {
- const access = await sparkSql?.value?.handleVerifySql();
+ const access = await appFormRef?.value?.sparkSql?.handleVerifySql();
if (!access) {
createMessage.warning(t('spark.app.addAppTips.sqlCheck'));
throw new Error(access);
}
}
- handleSQLMode(formValue);
+ handleSQLMode({
+ ...formValue,
+ config,
+ });
} else {
- handleCustomJobMode(formValue);
+ handleCustomJobMode({
+ ...formValue,
+ config,
+ });
}
}
/* send create request */
@@ -155,6 +180,11 @@
<template>
<PageWrapper contentFullHeight contentBackground contentClass="p-26px
app_controller">
- <AppForm :initFormFn="handleAppFieldValue" :submit="handleAppSubmit"
:spark-envs="sparkEnvs" />
+ <AppForm
+ ref="appFormRef"
+ :initFormFn="handleAppFieldValue"
+ :submit="handleAppSubmit"
+ :spark-envs="sparkEnvs"
+ />
</PageWrapper>
</template>
diff --git
a/streampark-console/streampark-console-webapp/src/views/spark/app/hooks/useAppFormSchema.tsx
b/streampark-console/streampark-console-webapp/src/views/spark/app/hooks/useAppFormSchema.tsx
index e0e1a677f..55a2735f7 100644
---
a/streampark-console/streampark-console-webapp/src/views/spark/app/hooks/useAppFormSchema.tsx
+++
b/streampark-console/streampark-console-webapp/src/views/spark/app/hooks/useAppFormSchema.tsx
@@ -22,24 +22,21 @@ import { ResourceFromEnum } from '/@/enums/flinkEnum';
import type { SparkEnv } from '/@/api/spark/home.type';
import type { RuleObject } from 'ant-design-vue/lib/form';
import type { StoreValue } from 'ant-design-vue/lib/form/interface';
-import {
- renderIsSetConfig,
- renderStreamParkResource,
- renderYarnQueue,
- sparkJobTypeMap,
-} from './useSparkRender';
+import { renderIsSetConfig, renderStreamParkResource, sparkJobTypeMap } from
'./useSparkRender';
import { executionModes } from '../data';
import { useDrawer } from '/@/components/Drawer';
import { fetchVariableAll } from '/@/api/resource/variable';
import { fetchTeamResource } from '/@/api/resource/upload';
import { fetchCheckSparkName } from '/@/api/spark/app';
import { useRoute } from 'vue-router';
-import { Alert } from 'ant-design-vue';
+import { Alert, Select, Tag } from 'ant-design-vue';
+import { fetchYarnQueueList } from '/@/api/setting/yarnQueue';
export function useSparkSchema(sparkEnvs: Ref<SparkEnv[]>) {
const { t } = useI18n();
const route = useRoute();
const teamResource = ref<Array<any>>([]);
+ const yarnQueue = ref<Array<any>>([]);
const suggestions = ref<Array<{ text: string; description: string; value:
string }>>([]);
const [registerConfDrawer, { openDrawer: openConfDrawer }] = useDrawer();
@@ -211,8 +208,29 @@ export function useSparkSchema(sparkEnvs: Ref<SparkEnv[]>)
{
field: 'yarnQueue',
label: t('spark.app.yarnQueue'),
component: 'Input',
- render: (renderCallbackParams) =>
renderYarnQueue(renderCallbackParams),
+ render: ({ model, field }) => {
+ return (
+ <div>
+ <Select
+ name="yarnQueue"
+
placeholder={t('setting.yarnQueue.placeholder.yarnQueueLabelExpression')}
+ fieldNames={{ label: 'queueLabel', value: 'queueLabel' }}
+ v-model={[model[field], 'value']}
+ showSearch={true}
+ />
+ <p class="conf-desc mt-10px">
+ <span class="note-info">
+ <Tag color="#2db7f5" class="tag-note">
+ {t('flink.app.noteInfo.note')}
+ </Tag>
+ {t('setting.yarnQueue.selectionHint')}
+ </span>
+ </p>
+ </div>
+ );
+ },
},
+ { field: 'configOverride', label: '', component: 'Input', show: false },
{
field: 'isSetConfig',
label: t('spark.app.appConf'),
@@ -257,6 +275,12 @@ export function useSparkSchema(sparkEnvs: Ref<SparkEnv[]>)
{
fetchTeamResource({}).then((res) => {
teamResource.value = res;
});
+ fetchYarnQueueList({
+ page: 1,
+ pageSize: 9999,
+ }).then((res) => {
+ yarnQueue.value = res.records;
+ });
fetchVariableAll().then((res) => {
suggestions.value = res.map((v) => {
return {
diff --git
a/streampark-console/streampark-console-webapp/src/views/spark/app/hooks/useSparkAction.tsx
b/streampark-console/streampark-console-webapp/src/views/spark/app/hooks/useSparkAction.tsx
index f0a9cb9c9..37d1e5091 100644
---
a/streampark-console/streampark-console-webapp/src/views/spark/app/hooks/useSparkAction.tsx
+++
b/streampark-console/streampark-console-webapp/src/views/spark/app/hooks/useSparkAction.tsx
@@ -290,11 +290,11 @@ export const useSparkAction = (optionApps: Recordable) =>
{
const code = parseInt(resp);
if (code === 0) {
try {
- const { data } = await fetchCopySparkApp({
+ const res = await fetchCopySparkApp({
id: item.id,
appName: copyAppName,
});
- const status = data.status || 'error';
+ const status = res.status || 'error';
if (status === 'success') {
Swal.fire({
icon: 'success',
diff --git
a/streampark-console/streampark-console-webapp/src/views/spark/home/components/Modal.vue
b/streampark-console/streampark-console-webapp/src/views/spark/home/components/Modal.vue
index cb628357b..cd58b268d 100644
---
a/streampark-console/streampark-console-webapp/src/views/spark/home/components/Modal.vue
+++
b/streampark-console/streampark-console-webapp/src/views/spark/home/components/Modal.vue
@@ -83,43 +83,35 @@
/* form submit */
async function handleSubmit() {
- changeOkLoading(true);
- let formValue;
try {
- formValue = await validate();
- } catch (error) {
- console.warn('validate error:', error);
- return;
- } finally {
- changeOkLoading(false);
- }
- // Detection environment
- const resp = await fetchSparkEnvCheck({
- id: versionId.value,
- sparkName: formValue.sparkName,
- sparkHome: formValue.sparkHome,
- });
- const checkResp = parseInt(resp);
- if (checkResp !== SparkEnvCheckEnum.OK) {
- switch (checkResp) {
- case SparkEnvCheckEnum.INVALID_PATH:
- Swal.fire('Failed', t('spark.home.tips.sparkHomePathIsInvalid'),
'error');
- break;
- case SparkEnvCheckEnum.NAME_REPEATED:
- Swal.fire('Failed', t('spark.home.tips.sparkNameIsRepeated'),
'error');
- break;
- case SparkEnvCheckEnum.SPARK_DIST_NOT_FOUND:
- Swal.fire('Failed', t('spark.home.tips.sparkDistNotFound'), 'error');
- break;
- case SparkEnvCheckEnum.SPARK_DIST_REPEATED:
- Swal.fire('Failed', t('spark.home.tips.sparkDistIsRepeated'),
'error');
- break;
+ const formValue = await validate();
+ changeOkLoading(true);
+ // Detection environment
+ const resp = await fetchSparkEnvCheck({
+ id: versionId.value,
+ sparkName: formValue.sparkName,
+ sparkHome: formValue.sparkHome,
+ });
+ const checkResp = parseInt(resp);
+ if (checkResp !== SparkEnvCheckEnum.OK) {
+ switch (checkResp) {
+ case SparkEnvCheckEnum.INVALID_PATH:
+ Swal.fire('Failed', t('spark.home.tips.sparkHomePathIsInvalid'),
'error');
+ break;
+ case SparkEnvCheckEnum.NAME_REPEATED:
+ Swal.fire('Failed', t('spark.home.tips.sparkNameIsRepeated'),
'error');
+ break;
+ case SparkEnvCheckEnum.SPARK_DIST_NOT_FOUND:
+ Swal.fire('Failed', t('spark.home.tips.sparkDistNotFound'),
'error');
+ break;
+ case SparkEnvCheckEnum.SPARK_DIST_REPEATED:
+ Swal.fire('Failed', t('spark.home.tips.sparkDistIsRepeated'),
'error');
+ break;
+ }
+ changeOkLoading(false);
+ return;
}
- changeOkLoading(false);
- return;
- }
- try {
let message: string;
let success = false;
// create
@@ -156,6 +148,9 @@
} else {
Swal.fire('Failed', message.replaceAll(/\[StreamPark]/g, ''), 'error');
}
+ } catch (error) {
+ console.warn('validate error:', error);
+ return;
} finally {
changeOkLoading(false);
}