This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 83d741ac2 [Feature] The front-end is modified to support the cdc yaml
api (#4177)
83d741ac2 is described below
commit 83d741ac2b534f7b715bde816f988781fd4bacea
Author: ouyangwulin <[email protected]>
AuthorDate: Mon Jan 20 21:14:59 2025 +0800
[Feature] The front-end is modified to support the cdc yaml api (#4177)
---
.../streampark-console-webapp/package.json | 6 +++--
.../src/enums/flinkEnum.ts | 1 +
.../src/locales/lang/en/flink/app.ts | 2 ++
.../src/locales/lang/zh-CN/flink/app.ts | 2 ++
.../src/views/flink/app/Add.vue | 9 ++++---
.../src/views/flink/app/EditStreamPark.vue | 10 +++++---
.../src/views/flink/app/View.vue | 3 +++
.../src/views/flink/app/components/FlinkSql.vue | 27 ++++++++++++++++++-
.../src/views/flink/app/hooks/useAppTableAction.ts | 3 ++-
.../flink/app/hooks/useCreateAndEditSchema.ts | 12 +++++----
.../src/views/flink/app/hooks/useCreateSchema.ts | 30 ++++++++++++++--------
11 files changed, 79 insertions(+), 26 deletions(-)
diff --git a/streampark-console/streampark-console-webapp/package.json
b/streampark-console/streampark-console-webapp/package.json
index 9e3ce9bac..d7cfeb0fd 100644
--- a/streampark-console/streampark-console-webapp/package.json
+++ b/streampark-console/streampark-console-webapp/package.json
@@ -65,7 +65,8 @@
"vue": "^3.3.4",
"vue-i18n": "^9.2.2",
"vue-router": "^4.2.4",
- "vue-types": "^5.1.0"
+ "vue-types": "^5.1.0",
+ "js-yaml": "^4.1.0"
},
"devDependencies": {
"@iconify/json": "^2.2.89",
@@ -118,7 +119,8 @@
"vite-plugin-theme": "^0.8.6",
"vite-plugin-windicss": "^1.9.1",
"vue-eslint-parser": "^9.3.1",
- "vue-tsc": "^1.8.4"
+ "vue-tsc": "^1.8.4",
+ "js-yaml": "^4.1.0"
},
"engines": {
"node": ">=16.15.1 <= 18",
diff --git
a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
index 7c078d5e5..d7e72ff6c 100644
--- a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
+++ b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
@@ -151,6 +151,7 @@ export enum JobTypeEnum {
JAR = 1,
SQL = 2,
PYFLINK = 3,
+ CDC = 4,
}
export enum ConfigTypeEnum {
diff --git
a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
index ab0bc56b8..7840b7da4 100644
---
a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
+++
b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
@@ -205,8 +205,10 @@ export default {
editStreamPark: {
success: 'update successful',
flinkSqlRequired: 'Flink Sql is required',
+ yamlRequired: 'Yaml is required',
appidCheck: 'appid can not be empty',
sqlCheck: 'SQL check error',
+ yamlCheck: 'Yaml check error',
},
operation: {
edit: 'Edit Job',
diff --git
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
index 11a8b417a..5ba959233 100644
---
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
+++
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
@@ -195,6 +195,8 @@ export default {
flinkSqlRequired: 'Flink Sql 为必填项',
appidCheck: 'appid 不能为空',
sqlCheck: 'SQL 检查错误',
+ yamlRequired: 'Yaml 不能为空',
+ yamlCheck: 'Yaml 检查错误',
},
operation: {
edit: '编辑作业',
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
index 4684f7ca0..da1910f09 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
@@ -263,13 +263,15 @@
async function handleAppCreate(formValue: Recordable) {
try {
submitLoading.value = true;
- if (formValue.jobType == JobTypeEnum.SQL) {
+ if (formValue.jobType == JobTypeEnum.SQL || formValue.jobType ==
JobTypeEnum.CDC) {
if (formValue.flinkSql == null || formValue.flinkSql.trim() === '') {
-
createMessage.warning(t('flink.app.editStreamPark.flinkSqlRequired'));
+ const errorMsg = formValue.jobType == JobTypeEnum.SQL ?
t('flink.app.editStreamPark.flinkSqlRequired') :
t('flink.app.editStreamPark.yamlRequired')
+ createMessage.warning(errorMsg);
} else {
const access = await flinkSql?.value?.handleVerifySql();
if (!access) {
- createMessage.warning(t('flink.app.editStreamPark.sqlCheck'));
+ const errorMsg = formValue.jobType == JobTypeEnum.SQL ?
t('flink.app.editStreamPark.sqlCheck') : t('flink.app.editStreamPark.yamlCheck')
+ createMessage.warning(errorMsg);
throw new Error(access);
}
}
@@ -316,6 +318,7 @@
v-model:value="model[field]"
:versionId="model['versionId']"
:suggestions="suggestions"
+ :jobType="Number(model['jobType'])"
@preview="(value) => openReviewDrawer(true, { value, suggestions })"
/>
</template>
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
index 2cc118ceb..1180730bf 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
@@ -164,13 +164,15 @@
async function handleAppUpdate(values) {
try {
submitLoading.value = true;
- if (app.jobType == JobTypeEnum.SQL) {
+ if (app.jobType == JobTypeEnum.SQL || app.jobType == JobTypeEnum.CDC) {
if (values.flinkSql == null || values.flinkSql.trim() === '') {
-
createMessage.warning(t('flink.app.editStreamPark.flinkSqlRequired'));
+ const errorMsg = app.jobType == JobTypeEnum.SQL ?
t('flink.app.editStreamPark.flinkSqlRequired') :
t('flink.app.editStreamPark.yamlRequired')
+ createMessage.warning(errorMsg);
} else {
const access = await flinkSql?.value?.handleVerifySql();
if (!access) {
- createMessage.warning(t('flink.app.editStreamPark.sqlCheck'));
+ const errorMsg = app.jobType == JobTypeEnum.SQL ?
t('flink.app.editStreamPark.sqlCheck') : t('flink.app.editStreamPark.yamlCheck')
+ createMessage.warning(errorMsg);
throw new Error(access);
}
handleSubmitSQL(values);
@@ -283,7 +285,7 @@
Object.assign(app, res);
Object.assign(defaultOptions, JSON.parse(app.options || '{}'));
- if (app.jobType == JobTypeEnum.SQL) {
+ if (app.jobType == JobTypeEnum.SQL || app.jobType == JobTypeEnum.CDC) {
fetchFlinkHistory({ id: appId }).then((res) => {
flinkSqlHistory.value = res;
});
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue
index 99cea85a2..e33a45a82 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue
@@ -310,6 +310,7 @@
:options="[
{ label: 'JAR', value: JobTypeEnum.JAR },
{ label: 'SQL', value: JobTypeEnum.SQL },
+ { label: 'FlinkCDC', value: JobTypeEnum.CDC },
]"
/>
</Form.Item>
@@ -349,6 +350,7 @@
<template v-if="column.dataIndex === 'jobName'">
<span class="app_type app_jar" v-if="record['jobType'] ===
JobTypeEnum.JAR"> JAR </span>
<span class="app_type app_sql" v-if="record['jobType'] ===
JobTypeEnum.SQL"> SQL </span>
+ <span class="app_type app_sql" v-if="record['jobType'] ===
JobTypeEnum.CDC"> CDC </span>
<span class="link cursor-pointer" @click="handleJobView(record)">
<Popover :title="t('common.detailText')">
<template #content>
@@ -361,6 +363,7 @@
<Tag color="blue">
<span v-if="record['jobType'] == JobTypeEnum.JAR"> JAR
</span>
<span v-if="record['jobType'] == JobTypeEnum.SQL"> SQL
</span>
+ <span v-if="record['jobType'] == JobTypeEnum.CDC">
FlinkCDC </span>
</Tag>
</div>
<div class="pt-2px flex">
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/FlinkSql.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/FlinkSql.vue
index 865709c7f..05d94cc00 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/FlinkSql.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/FlinkSql.vue
@@ -35,6 +35,8 @@
import { format } from '../FlinkSqlFormatter';
import { useI18n } from '/@/hooks/web/useI18n';
import { useFullContent } from '/@/hooks/event/useFullscreen';
+ import { JobTypeEnum } from '/@/enums/flinkEnum';
+ import YAML from 'js-yaml';
const ButtonGroup = Button.Group;
const { t } = useI18n();
@@ -65,6 +67,9 @@
type: Array as PropType<Array<{ text: string; description: string }>>,
default: () => [],
},
+ jobType: {
+ type: Number
+ }
});
const defaultValue = '';
@@ -79,7 +84,23 @@
createMessage.error(t('flink.app.dependencyError'));
return false;
} else {
- try {
+ console.log(props.jobType)
+ if (props.jobType === JobTypeEnum.CDC) {
+ try {
+ YAML.load(props.value);
+ verifyRes.verified = true;
+ verifyRes.errorMsg = '';
+ syntaxError();
+ return true;
+ } catch (error) {
+ verifyRes.errorStart = 0;
+ verifyRes.errorEnd = 0;
+ verifyRes.errorMsg = `${error.name}: ${error.reason} at line
${error.mark.line},cloumn ${error.mark.column}`;
+ syntaxError();
+ return false;
+ }
+ } else {
+ try {
const { data } = await fetchFlinkSqlVerify({
sql: props.value,
versionId: props.versionId,
@@ -111,6 +132,7 @@
console.error(error);
return false;
}
+ }
}
}
@@ -140,6 +162,9 @@
/* format */
function handleFormatSql() {
if (isEmpty(props.value)) return;
+ if (props.jobType === JobTypeEnum.CDC) {
+ return false;
+ }
const formatSql = format(props.value);
setContent(formatSql);
}
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts
index c65ca155f..7fc1130bc 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts
@@ -232,7 +232,7 @@ export const useAppTableAction = (
sessionStorage.setItem('appPageNo', String(currentPageNo || 1));
flinkAppStore.setApplicationId(app.id);
if (app.appType == AppTypeEnum.STREAMPARK_FLINK) {
- // jobType( 1 flinkJar 2: flinkSQL)
+ // jobType( 1 custom code 2: flinkSQL)
router.push({ path: '/flink/app/edit_streampark', query: { appId: app.id
} });
} else if (app.appType == AppTypeEnum.APACHE_FLINK) {
//Apache Flink
@@ -314,6 +314,7 @@ export const useAppTableAction = (
{ label: 'JAR', value: JobTypeEnum.JAR },
{ label: 'SQL', value: JobTypeEnum.SQL },
{ label: 'PYFLINK', value: JobTypeEnum.PYFLINK },
+ { label: 'FLINK CDC', value: JobTypeEnum.CDC },
],
onChange: handlePageDataReload.bind(null, false),
},
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
index 62f20eab5..25e0f1995 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
@@ -109,7 +109,7 @@ export const useCreateAndEditSchema = (
label: 'Flink SQL',
component: 'Input',
slot: 'flinkSql',
- ifShow: ({ values }) => values?.jobType == JobTypeEnum.SQL,
+ ifShow: ({ values }) => values?.jobType == JobTypeEnum.SQL ||
values?.jobType == JobTypeEnum.CDC,
rules: [{ required: true, message:
t('flink.app.addAppTips.flinkSqlIsRequiredMessage') }],
},
{
@@ -117,7 +117,7 @@ export const useCreateAndEditSchema = (
label: t('flink.app.resource'),
component: 'Select',
render: ({ model }) => renderStreamParkResource({ model, resources:
unref(teamResource) }),
- ifShow: ({ values }) => values.jobType == JobTypeEnum.SQL,
+ ifShow: ({ values }) => values.jobType == JobTypeEnum.SQL ||
values.jobType == JobTypeEnum.CDC,
},
{
field: 'dependency',
@@ -132,7 +132,7 @@ export const useCreateAndEditSchema = (
label: t('flink.app.appConf'),
component: 'Switch',
ifShow: ({ values }) =>
- values?.jobType == JobTypeEnum.SQL &&
!isK8sDeployMode(values.deployMode),
+ (values?.jobType == JobTypeEnum.SQL || values?.jobType ==
JobTypeEnum.CDC) && !isK8sDeployMode(values.deployMode),
render({ model, field }) {
return renderIsSetConfig(model, field, registerConfDrawer,
openConfDrawer);
},
@@ -477,7 +477,7 @@ export const useCreateAndEditSchema = (
component: 'InputTextArea',
defaultValue: '',
slot: 'args',
- ifShow: ({ values }) => (edit?.mode ? true : values.jobType !=
JobTypeEnum.SQL),
+ ifShow: ({ values }) => (edit?.mode ? true : (values.jobType !=
JobTypeEnum.SQL && values.jobType != JobTypeEnum.CDC)),
},
{
field: 'hadoopUser',
@@ -510,10 +510,12 @@ export const useCreateAndEditSchema = (
icon: 'ant-design:code-outlined',
style: { color: '#108ee9' },
}),
- h('span', { class: 'pl-8px' }, 'Flink JAR'),
+ h('span', { class: 'pl-8px' }, 'Custom Code'),
],
},
);
+ } else if(model.jobType == JobTypeEnum.CDC) {
+ return getAlertSvgIcon('fql', 'Flink CDC');
} else {
return getAlertSvgIcon('fql', 'Flink SQL');
}
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts
index 59bdad48a..348ad5d38 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts
@@ -39,8 +39,8 @@ const getJobTypeOptions = () => {
return [
{
label: h('div', {}, [
- h(SvgIcon, { name: 'fjar', color: '#108ee9' }, ''),
- h('span', { class: 'pl-10px' }, 'Flink JAR'),
+ h(SvgIcon, { name: 'code', color: '#108ee9' }, ''),
+ h('span', { class: 'pl-10px' }, 'Custom Code'),
]),
value: String(JobTypeEnum.JAR),
},
@@ -58,6 +58,13 @@ const getJobTypeOptions = () => {
]),
value: String(JobTypeEnum.PYFLINK),
},
+ {
+ label: h('div', {}, [
+ h(SvgIcon, { name: 'fql', color: '#108ee9' }, ''),
+ h('span', { class: 'pl-10px' }, 'Flink CDC'),
+ ]),
+ value: String(JobTypeEnum.CDC),
+ },
];
};
@@ -108,7 +115,7 @@ export const useCreateSchema = (dependencyRef: Ref) => {
placeholder: t('flink.app.addAppTips.jobTypePlaceholder'),
options: getJobTypeOptions(),
onChange: (value) => {
- if (value != JobTypeEnum.SQL) {
+ if (value != JobTypeEnum.SQL && value != JobTypeEnum.CDC) {
formModel.resourceFrom = String(ResourceFromEnum.PROJECT);
}
},
@@ -126,7 +133,7 @@ export const useCreateSchema = (dependencyRef: Ref) => {
component: 'Select',
render: ({ model }) => renderResourceFrom(model),
rules: [{ required: true, message:
t('flink.app.addAppTips.resourceFromMessage') }],
- show: ({ values }) => values?.jobType != JobTypeEnum.SQL,
+ show: ({ values }) => values?.jobType != JobTypeEnum.SQL &&
values?.jobType != JobTypeEnum.CDC,
},
{
field: 'uploadJobJar',
@@ -134,7 +141,7 @@ export const useCreateSchema = (dependencyRef: Ref) => {
component: 'Select',
render: ({ model }) => renderStreamParkJarApp({ model, resources:
unref(teamResource) }),
ifShow: ({ values }) =>
- values?.jobType != JobTypeEnum.SQL && values?.resourceFrom ==
ResourceFromEnum.UPLOAD,
+ values?.jobType != JobTypeEnum.SQL && values?.jobType !=
JobTypeEnum.CDC && values?.resourceFrom == ResourceFromEnum.UPLOAD,
},
{
field: 'mainClass',
@@ -142,7 +149,7 @@ export const useCreateSchema = (dependencyRef: Ref) => {
component: 'Input',
componentProps: { placeholder:
t('flink.app.addAppTips.mainClassPlaceholder') },
ifShow: ({ values }) =>
- values?.jobType != JobTypeEnum.SQL && values?.resourceFrom ==
ResourceFromEnum.UPLOAD,
+ values?.jobType != JobTypeEnum.SQL && values?.jobType !=
JobTypeEnum.CDC && values?.resourceFrom == ResourceFromEnum.UPLOAD,
rules: [{ required: true, message:
t('flink.app.addAppTips.mainClassIsRequiredMessage') }],
},
{
@@ -170,7 +177,7 @@ export const useCreateSchema = (dependencyRef: Ref) => {
},
},
ifShow: ({ values }) =>
- values?.jobType != JobTypeEnum.SQL && values.resourceFrom !=
ResourceFromEnum.UPLOAD,
+ values?.jobType != JobTypeEnum.SQL && values?.jobType !=
JobTypeEnum.CDC && values.resourceFrom != ResourceFromEnum.UPLOAD,
rules: [{ required: true, message:
t('flink.app.addAppTips.projectIsRequiredMessage') }],
},
{
@@ -193,8 +200,8 @@ export const useCreateSchema = (dependencyRef: Ref) => {
},
};
},
- ifShow: ({ values }) =>
- values?.jobType != JobTypeEnum.SQL && values?.resourceFrom !=
ResourceFromEnum.UPLOAD,
+ ifShow: ({ values }) =>
+ values?.jobType != JobTypeEnum.SQL && values?.jobType !=
JobTypeEnum.CDC && values?.resourceFrom != ResourceFromEnum.UPLOAD,
rules: [{ required: true, message:
t('flink.app.addAppTips.projectIsRequiredMessage') }],
},
{
@@ -225,7 +232,7 @@ export const useCreateSchema = (dependencyRef: Ref) => {
};
},
ifShow: ({ values }) =>
- values?.jobType != JobTypeEnum.SQL && values?.resourceFrom !=
ResourceFromEnum.UPLOAD,
+ values?.jobType != JobTypeEnum.SQL && values?.jobType !=
JobTypeEnum.CDC && values?.resourceFrom != ResourceFromEnum.UPLOAD,
dynamicRules: () => [
{ required: true, message:
t('flink.app.addAppTips.appTypeIsRequiredMessage') },
],
@@ -251,6 +258,7 @@ export const useCreateSchema = (dependencyRef: Ref) => {
},
ifShow: ({ values }) =>
values?.jobType != JobTypeEnum.SQL &&
+ values?.jobType != JobTypeEnum.CDC &&
values?.resourceFrom != ResourceFromEnum.UPLOAD &&
values.appType == String(AppTypeEnum.APACHE_FLINK),
rules: [{ required: true, message:
t('flink.app.addAppTips.programJarIsRequiredMessage') }],
@@ -262,6 +270,7 @@ export const useCreateSchema = (dependencyRef: Ref) => {
componentProps: { placeholder:
t('flink.app.addAppTips.mainClassPlaceholder') },
ifShow: ({ values }) =>
values?.jobType != JobTypeEnum.SQL &&
+ values?.jobType != JobTypeEnum.CDC &&
values?.resourceFrom != ResourceFromEnum.UPLOAD &&
values.appType == String(AppTypeEnum.APACHE_FLINK),
rules: [{ required: true, message:
t('flink.app.addAppTips.mainClassIsRequiredMessage') }],
@@ -289,6 +298,7 @@ export const useCreateSchema = (dependencyRef: Ref) => {
},
ifShow: ({ values }) =>
values?.jobType != JobTypeEnum.SQL &&
+ values?.jobType != JobTypeEnum.CDC &&
values?.resourceFrom != ResourceFromEnum.UPLOAD &&
values.appType == String(AppTypeEnum.STREAMPARK_FLINK),
dynamicRules: () => [{ required: true, validator: handleCheckConfig }],