This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 83d741ac2 [Feature] The front-end is modified to support the cdc yaml 
api (#4177)
83d741ac2 is described below

commit 83d741ac2b534f7b715bde816f988781fd4bacea
Author: ouyangwulin <[email protected]>
AuthorDate: Mon Jan 20 21:14:59 2025 +0800

    [Feature] The front-end is modified to support the cdc yaml api (#4177)
---
 .../streampark-console-webapp/package.json         |  6 +++--
 .../src/enums/flinkEnum.ts                         |  1 +
 .../src/locales/lang/en/flink/app.ts               |  2 ++
 .../src/locales/lang/zh-CN/flink/app.ts            |  2 ++
 .../src/views/flink/app/Add.vue                    |  9 ++++---
 .../src/views/flink/app/EditStreamPark.vue         | 10 +++++---
 .../src/views/flink/app/View.vue                   |  3 +++
 .../src/views/flink/app/components/FlinkSql.vue    | 27 ++++++++++++++++++-
 .../src/views/flink/app/hooks/useAppTableAction.ts |  3 ++-
 .../flink/app/hooks/useCreateAndEditSchema.ts      | 12 +++++----
 .../src/views/flink/app/hooks/useCreateSchema.ts   | 30 ++++++++++++++--------
 11 files changed, 79 insertions(+), 26 deletions(-)

diff --git a/streampark-console/streampark-console-webapp/package.json 
b/streampark-console/streampark-console-webapp/package.json
index 9e3ce9bac..d7cfeb0fd 100644
--- a/streampark-console/streampark-console-webapp/package.json
+++ b/streampark-console/streampark-console-webapp/package.json
@@ -65,7 +65,8 @@
     "vue": "^3.3.4",
     "vue-i18n": "^9.2.2",
     "vue-router": "^4.2.4",
-    "vue-types": "^5.1.0"
+    "vue-types": "^5.1.0",
+    "js-yaml": "^4.1.0"
   },
   "devDependencies": {
     "@iconify/json": "^2.2.89",
@@ -118,7 +119,8 @@
     "vite-plugin-theme": "^0.8.6",
     "vite-plugin-windicss": "^1.9.1",
     "vue-eslint-parser": "^9.3.1",
-    "vue-tsc": "^1.8.4"
+    "vue-tsc": "^1.8.4",
+    "js-yaml": "^4.1.0"
   },
   "engines": {
     "node": ">=16.15.1 <= 18",
diff --git 
a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts 
b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
index 7c078d5e5..d7e72ff6c 100644
--- a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
+++ b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
@@ -151,6 +151,7 @@ export enum JobTypeEnum {
   JAR = 1,
   SQL = 2,
   PYFLINK = 3,
+  CDC = 4,
 }
 
 export enum ConfigTypeEnum {
diff --git 
a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts 
b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
index ab0bc56b8..7840b7da4 100644
--- 
a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
+++ 
b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
@@ -205,8 +205,10 @@ export default {
   editStreamPark: {
     success: 'update successful',
     flinkSqlRequired: 'Flink Sql is required',
+    yamlRequired: 'Yaml is required',
     appidCheck: 'appid can not be empty',
     sqlCheck: 'SQL check error',
+    yamlCheck: 'Yaml check error',
   },
   operation: {
     edit: 'Edit Job',
diff --git 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
 
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
index 11a8b417a..5ba959233 100644
--- 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
+++ 
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
@@ -195,6 +195,8 @@ export default {
     flinkSqlRequired: 'Flink Sql 为必填项',
     appidCheck: 'appid 不能为空',
     sqlCheck: 'SQL 检查错误',
+    yamlRequired: 'Yaml 不能为空',
+    yamlCheck: 'Yaml 检查错误',
   },
   operation: {
     edit: '编辑作业',
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue 
b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
index 4684f7ca0..da1910f09 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue
@@ -263,13 +263,15 @@
   async function handleAppCreate(formValue: Recordable) {
     try {
       submitLoading.value = true;
-      if (formValue.jobType == JobTypeEnum.SQL) {
+      if (formValue.jobType == JobTypeEnum.SQL || formValue.jobType == 
JobTypeEnum.CDC) {
         if (formValue.flinkSql == null || formValue.flinkSql.trim() === '') {
-          
createMessage.warning(t('flink.app.editStreamPark.flinkSqlRequired'));
+          const errorMsg = formValue.jobType == JobTypeEnum.SQL ? 
t('flink.app.editStreamPark.flinkSqlRequired') : 
t('flink.app.editStreamPark.yamlRequired')
+          createMessage.warning(errorMsg);
         } else {
           const access = await flinkSql?.value?.handleVerifySql();
           if (!access) {
-            createMessage.warning(t('flink.app.editStreamPark.sqlCheck'));
+            const errorMsg = formValue.jobType == JobTypeEnum.SQL ? 
t('flink.app.editStreamPark.sqlCheck') : t('flink.app.editStreamPark.yamlCheck')
+            createMessage.warning(errorMsg);
             throw new Error(access);
           }
         }
@@ -316,6 +318,7 @@
           v-model:value="model[field]"
           :versionId="model['versionId']"
           :suggestions="suggestions"
+          :jobType="Number(model['jobType'])"
           @preview="(value) => openReviewDrawer(true, { value, suggestions })"
         />
       </template>
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
index 2cc118ceb..1180730bf 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue
@@ -164,13 +164,15 @@
   async function handleAppUpdate(values) {
     try {
       submitLoading.value = true;
-      if (app.jobType == JobTypeEnum.SQL) {
+      if (app.jobType == JobTypeEnum.SQL || app.jobType == JobTypeEnum.CDC) {
         if (values.flinkSql == null || values.flinkSql.trim() === '') {
-          
createMessage.warning(t('flink.app.editStreamPark.flinkSqlRequired'));
+          const errorMsg = app.jobType == JobTypeEnum.SQL ? 
t('flink.app.editStreamPark.flinkSqlRequired') : 
t('flink.app.editStreamPark.yamlRequired')
+          createMessage.warning(errorMsg);
         } else {
           const access = await flinkSql?.value?.handleVerifySql();
           if (!access) {
-            createMessage.warning(t('flink.app.editStreamPark.sqlCheck'));
+            const errorMsg = app.jobType == JobTypeEnum.SQL ? 
t('flink.app.editStreamPark.sqlCheck') : t('flink.app.editStreamPark.yamlCheck')
+            createMessage.warning(errorMsg);
             throw new Error(access);
           }
           handleSubmitSQL(values);
@@ -283,7 +285,7 @@
     Object.assign(app, res);
     Object.assign(defaultOptions, JSON.parse(app.options || '{}'));
 
-    if (app.jobType == JobTypeEnum.SQL) {
+    if (app.jobType == JobTypeEnum.SQL || app.jobType == JobTypeEnum.CDC) {
       fetchFlinkHistory({ id: appId }).then((res) => {
         flinkSqlHistory.value = res;
       });
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue 
b/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue
index 99cea85a2..e33a45a82 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue
@@ -310,6 +310,7 @@
                       :options="[
                         { label: 'JAR', value: JobTypeEnum.JAR },
                         { label: 'SQL', value: JobTypeEnum.SQL },
+                        { label: 'FlinkCDC', value: JobTypeEnum.CDC },
                       ]"
                     />
                   </Form.Item>
@@ -349,6 +350,7 @@
           <template v-if="column.dataIndex === 'jobName'">
             <span class="app_type app_jar" v-if="record['jobType'] === 
JobTypeEnum.JAR"> JAR </span>
             <span class="app_type app_sql" v-if="record['jobType'] === 
JobTypeEnum.SQL"> SQL </span>
+            <span class="app_type app_sql" v-if="record['jobType'] === 
JobTypeEnum.CDC"> CDC </span>
             <span class="link cursor-pointer" @click="handleJobView(record)">
               <Popover :title="t('common.detailText')">
                 <template #content>
@@ -361,6 +363,7 @@
                     <Tag color="blue">
                       <span v-if="record['jobType'] == JobTypeEnum.JAR"> JAR 
</span>
                       <span v-if="record['jobType'] == JobTypeEnum.SQL"> SQL 
</span>
+                      <span v-if="record['jobType'] == JobTypeEnum.CDC"> 
FlinkCDC </span>
                     </Tag>
                   </div>
                   <div class="pt-2px flex">
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/FlinkSql.vue
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/FlinkSql.vue
index 865709c7f..05d94cc00 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/FlinkSql.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/FlinkSql.vue
@@ -35,6 +35,8 @@
   import { format } from '../FlinkSqlFormatter';
   import { useI18n } from '/@/hooks/web/useI18n';
   import { useFullContent } from '/@/hooks/event/useFullscreen';
+  import { JobTypeEnum } from '/@/enums/flinkEnum';
+  import YAML from 'js-yaml';
   const ButtonGroup = Button.Group;
   const { t } = useI18n();
 
@@ -65,6 +67,9 @@
       type: Array as PropType<Array<{ text: string; description: string }>>,
       default: () => [],
     },
+    jobType: {
+      type: Number
+    }
   });
   const defaultValue = '';
 
@@ -79,7 +84,23 @@
       createMessage.error(t('flink.app.dependencyError'));
       return false;
     } else {
-      try {
+      console.log(props.jobType)
+      if (props.jobType === JobTypeEnum.CDC) {
+        try {
+          YAML.load(props.value);
+          verifyRes.verified = true;
+          verifyRes.errorMsg = '';
+          syntaxError();
+          return true;
+        } catch (error) {
+          verifyRes.errorStart = 0;
+          verifyRes.errorEnd = 0;
+          verifyRes.errorMsg = `${error.name}: ${error.reason} at line 
${error.mark.line},cloumn ${error.mark.column}`;
+          syntaxError();
+          return false;
+        }
+      } else {
+        try {
         const { data } = await fetchFlinkSqlVerify({
           sql: props.value,
           versionId: props.versionId,
@@ -111,6 +132,7 @@
         console.error(error);
         return false;
       }
+      }
     }
   }
 
@@ -140,6 +162,9 @@
   /* format */
   function handleFormatSql() {
     if (isEmpty(props.value)) return;
+    if (props.jobType === JobTypeEnum.CDC) {
+      return false;
+    }
     const formatSql = format(props.value);
     setContent(formatSql);
   }
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts
index c65ca155f..7fc1130bc 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts
@@ -232,7 +232,7 @@ export const useAppTableAction = (
     sessionStorage.setItem('appPageNo', String(currentPageNo || 1));
     flinkAppStore.setApplicationId(app.id);
     if (app.appType == AppTypeEnum.STREAMPARK_FLINK) {
-      // jobType( 1 flinkJar 2: flinkSQL)
+      // jobType( 1 custom code 2: flinkSQL)
       router.push({ path: '/flink/app/edit_streampark', query: { appId: app.id 
} });
     } else if (app.appType == AppTypeEnum.APACHE_FLINK) {
       //Apache Flink
@@ -314,6 +314,7 @@ export const useAppTableAction = (
               { label: 'JAR', value: JobTypeEnum.JAR },
               { label: 'SQL', value: JobTypeEnum.SQL },
               { label: 'PYFLINK', value: JobTypeEnum.PYFLINK },
+              { label: 'FLINK CDC', value: JobTypeEnum.CDC },
             ],
             onChange: handlePageDataReload.bind(null, false),
           },
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
index 62f20eab5..25e0f1995 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts
@@ -109,7 +109,7 @@ export const useCreateAndEditSchema = (
         label: 'Flink SQL',
         component: 'Input',
         slot: 'flinkSql',
-        ifShow: ({ values }) => values?.jobType == JobTypeEnum.SQL,
+        ifShow: ({ values }) => values?.jobType == JobTypeEnum.SQL || 
values?.jobType == JobTypeEnum.CDC,
         rules: [{ required: true, message: 
t('flink.app.addAppTips.flinkSqlIsRequiredMessage') }],
       },
       {
@@ -117,7 +117,7 @@ export const useCreateAndEditSchema = (
         label: t('flink.app.resource'),
         component: 'Select',
         render: ({ model }) => renderStreamParkResource({ model, resources: 
unref(teamResource) }),
-        ifShow: ({ values }) => values.jobType == JobTypeEnum.SQL,
+        ifShow: ({ values }) => values.jobType == JobTypeEnum.SQL || 
values.jobType == JobTypeEnum.CDC,
       },
       {
         field: 'dependency',
@@ -132,7 +132,7 @@ export const useCreateAndEditSchema = (
         label: t('flink.app.appConf'),
         component: 'Switch',
         ifShow: ({ values }) =>
-          values?.jobType == JobTypeEnum.SQL && 
!isK8sDeployMode(values.deployMode),
+          (values?.jobType == JobTypeEnum.SQL || values?.jobType == 
JobTypeEnum.CDC) && !isK8sDeployMode(values.deployMode),
         render({ model, field }) {
           return renderIsSetConfig(model, field, registerConfDrawer, 
openConfDrawer);
         },
@@ -477,7 +477,7 @@ export const useCreateAndEditSchema = (
         component: 'InputTextArea',
         defaultValue: '',
         slot: 'args',
-        ifShow: ({ values }) => (edit?.mode ? true : values.jobType != 
JobTypeEnum.SQL),
+        ifShow: ({ values }) => (edit?.mode ? true : (values.jobType != 
JobTypeEnum.SQL && values.jobType != JobTypeEnum.CDC)),
       },
       {
         field: 'hadoopUser',
@@ -510,10 +510,12 @@ export const useCreateAndEditSchema = (
                     icon: 'ant-design:code-outlined',
                     style: { color: '#108ee9' },
                   }),
-                  h('span', { class: 'pl-8px' }, 'Flink JAR'),
+                  h('span', { class: 'pl-8px' }, 'Custom Code'),
                 ],
               },
             );
+          } else if(model.jobType == JobTypeEnum.CDC) {
+            return getAlertSvgIcon('fql', 'Flink CDC');
           } else {
             return getAlertSvgIcon('fql', 'Flink SQL');
           }
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts
index 59bdad48a..348ad5d38 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts
@@ -39,8 +39,8 @@ const getJobTypeOptions = () => {
   return [
     {
       label: h('div', {}, [
-        h(SvgIcon, { name: 'fjar', color: '#108ee9' }, ''),
-        h('span', { class: 'pl-10px' }, 'Flink JAR'),
+        h(SvgIcon, { name: 'code', color: '#108ee9' }, ''),
+        h('span', { class: 'pl-10px' }, 'Custom Code'),
       ]),
       value: String(JobTypeEnum.JAR),
     },
@@ -58,6 +58,13 @@ const getJobTypeOptions = () => {
       ]),
       value: String(JobTypeEnum.PYFLINK),
     },
+    {
+      label: h('div', {}, [
+        h(SvgIcon, { name: 'fql', color: '#108ee9' }, ''),
+        h('span', { class: 'pl-10px' }, 'Flink CDC'),
+      ]),
+      value: String(JobTypeEnum.CDC),
+    },
   ];
 };
 
@@ -108,7 +115,7 @@ export const useCreateSchema = (dependencyRef: Ref) => {
             placeholder: t('flink.app.addAppTips.jobTypePlaceholder'),
             options: getJobTypeOptions(),
             onChange: (value) => {
-              if (value != JobTypeEnum.SQL) {
+              if (value != JobTypeEnum.SQL && value != JobTypeEnum.CDC) {
                 formModel.resourceFrom = String(ResourceFromEnum.PROJECT);
               }
             },
@@ -126,7 +133,7 @@ export const useCreateSchema = (dependencyRef: Ref) => {
         component: 'Select',
         render: ({ model }) => renderResourceFrom(model),
         rules: [{ required: true, message: 
t('flink.app.addAppTips.resourceFromMessage') }],
-        show: ({ values }) => values?.jobType != JobTypeEnum.SQL,
+        show: ({ values }) => values?.jobType != JobTypeEnum.SQL && 
values?.jobType != JobTypeEnum.CDC,
       },
       {
         field: 'uploadJobJar',
@@ -134,7 +141,7 @@ export const useCreateSchema = (dependencyRef: Ref) => {
         component: 'Select',
         render: ({ model }) => renderStreamParkJarApp({ model, resources: 
unref(teamResource) }),
         ifShow: ({ values }) =>
-          values?.jobType != JobTypeEnum.SQL && values?.resourceFrom == 
ResourceFromEnum.UPLOAD,
+          values?.jobType != JobTypeEnum.SQL && values?.jobType != 
JobTypeEnum.CDC && values?.resourceFrom == ResourceFromEnum.UPLOAD,
       },
       {
         field: 'mainClass',
@@ -142,7 +149,7 @@ export const useCreateSchema = (dependencyRef: Ref) => {
         component: 'Input',
         componentProps: { placeholder: 
t('flink.app.addAppTips.mainClassPlaceholder') },
         ifShow: ({ values }) =>
-          values?.jobType != JobTypeEnum.SQL && values?.resourceFrom == 
ResourceFromEnum.UPLOAD,
+          values?.jobType != JobTypeEnum.SQL && values?.jobType != 
JobTypeEnum.CDC && values?.resourceFrom == ResourceFromEnum.UPLOAD,
         rules: [{ required: true, message: 
t('flink.app.addAppTips.mainClassIsRequiredMessage') }],
       },
       {
@@ -170,7 +177,7 @@ export const useCreateSchema = (dependencyRef: Ref) => {
           },
         },
         ifShow: ({ values }) =>
-          values?.jobType != JobTypeEnum.SQL && values.resourceFrom != 
ResourceFromEnum.UPLOAD,
+          values?.jobType != JobTypeEnum.SQL && values?.jobType != 
JobTypeEnum.CDC && values.resourceFrom != ResourceFromEnum.UPLOAD,
         rules: [{ required: true, message: 
t('flink.app.addAppTips.projectIsRequiredMessage') }],
       },
       {
@@ -193,8 +200,8 @@ export const useCreateSchema = (dependencyRef: Ref) => {
             },
           };
         },
-        ifShow: ({ values }) =>
-          values?.jobType != JobTypeEnum.SQL && values?.resourceFrom != 
ResourceFromEnum.UPLOAD,
+        ifShow: ({ values }) => 
+          values?.jobType != JobTypeEnum.SQL && values?.jobType != 
JobTypeEnum.CDC && values?.resourceFrom != ResourceFromEnum.UPLOAD,
         rules: [{ required: true, message: 
t('flink.app.addAppTips.projectIsRequiredMessage') }],
       },
       {
@@ -225,7 +232,7 @@ export const useCreateSchema = (dependencyRef: Ref) => {
           };
         },
         ifShow: ({ values }) =>
-          values?.jobType != JobTypeEnum.SQL && values?.resourceFrom != 
ResourceFromEnum.UPLOAD,
+          values?.jobType != JobTypeEnum.SQL && values?.jobType != 
JobTypeEnum.CDC && values?.resourceFrom != ResourceFromEnum.UPLOAD,
         dynamicRules: () => [
           { required: true, message: 
t('flink.app.addAppTips.appTypeIsRequiredMessage') },
         ],
@@ -251,6 +258,7 @@ export const useCreateSchema = (dependencyRef: Ref) => {
         },
         ifShow: ({ values }) =>
           values?.jobType != JobTypeEnum.SQL &&
+          values?.jobType != JobTypeEnum.CDC &&
           values?.resourceFrom != ResourceFromEnum.UPLOAD &&
           values.appType == String(AppTypeEnum.APACHE_FLINK),
         rules: [{ required: true, message: 
t('flink.app.addAppTips.programJarIsRequiredMessage') }],
@@ -262,6 +270,7 @@ export const useCreateSchema = (dependencyRef: Ref) => {
         componentProps: { placeholder: 
t('flink.app.addAppTips.mainClassPlaceholder') },
         ifShow: ({ values }) =>
           values?.jobType != JobTypeEnum.SQL &&
+          values?.jobType != JobTypeEnum.CDC &&
           values?.resourceFrom != ResourceFromEnum.UPLOAD &&
           values.appType == String(AppTypeEnum.APACHE_FLINK),
         rules: [{ required: true, message: 
t('flink.app.addAppTips.mainClassIsRequiredMessage') }],
@@ -289,6 +298,7 @@ export const useCreateSchema = (dependencyRef: Ref) => {
         },
         ifShow: ({ values }) =>
           values?.jobType != JobTypeEnum.SQL &&
+          values?.jobType != JobTypeEnum.CDC &&
           values?.resourceFrom != ResourceFromEnum.UPLOAD &&
           values.appType == String(AppTypeEnum.STREAMPARK_FLINK),
         dynamicRules: () => [{ required: true, validator: handleCheckConfig }],

Reply via email to