This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch customcode-deps in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 0a314c5a8f63acb2059a2e8799643ca5477887eb Author: benjobs <[email protected]> AuthorDate: Thu Oct 26 12:23:17 2023 +0800 [Improve] custom-code job support dependencies --- .../console/core/entity/Application.java | 17 ++------- .../core/service/impl/AppBuildPipeServiceImpl.java | 41 ++++++++++----------- .../core/service/impl/ApplicationServiceImpl.java | 22 ++++++++++- .../src/main/resources/db/data-h2.sql | 2 +- .../src/main/resources/db/schema-h2.sql | 1 + .../resources/mapper/core/ApplicationMapper.xml | 1 + .../Application/src/AppDarkModeToggle.vue | 4 +- .../src/components/ContextMenu/src/ContextMenu.vue | 4 +- .../src/components/Form/src/BasicForm.vue | 2 +- .../src/components/Page/src/PageFooter.vue | 4 +- .../src/hooks/web/useLockPage.ts | 9 ++--- .../streampark-console-webapp/src/utils/props.ts | 2 +- .../src/views/flink/app/Add.vue | 25 +++++++------ .../src/views/flink/app/EditFlink.vue | 30 ++++++++++++++- .../src/views/flink/app/EditStreamPark.vue | 2 +- .../flink/app/hooks/useCreateAndEditSchema.ts | 16 ++++++-- .../src/views/flink/app/hooks/useCreateSchema.ts | 12 +----- .../views/flink/app/hooks/useEditFlinkSchema.ts | 8 ++++ .../src/views/flink/app/hooks/useFlinkRender.tsx | 2 +- .../streampark/flink/packer/maven/MavenTool.scala | 43 +++++++++++++++------- 20 files changed, 149 insertions(+), 98 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java index 81f4ccc4c..3e579c4ed 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java @@ -220,7 +220,7 @@ public class Application implements Serializable { /** running job */ private transient JobsOverview.Task overview; - private transient String dependency; + private String dependency; private transient Long sqlId; private transient String flinkSql; @@ -390,15 +390,6 @@ public class Application implements Serializable { && this.cpFailureAction != null; } - public boolean eqFlinkJob(Application other) { - if (this.isFlinkSqlJob() && other.isFlinkSqlJob()) { - if (this.getFlinkSql().trim().equals(other.getFlinkSql().trim())) { - return this.getDependencyObject().eq(other.getDependencyObject()); - } - } - return false; - } - /** Local compilation and packaging working directory */ @JsonIgnore public String getDistHome() { @@ -752,11 +743,11 @@ public class Application implements Serializable { @Override public String toString() { - return groupId + ":" + artifactId + ":" + version + getClassifier(":"); + return groupId + ":" + artifactId + ":" + version + getClassifier(); } - private String getClassifier(String joiner) { - return StringUtils.isEmpty(classifier) ? "" : joiner + classifier; + private String getClassifier() { + return StringUtils.isEmpty(classifier) ? "" : ":" + classifier; } } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index 6f8890bd2..8dc14141d 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -184,32 +184,24 @@ public class AppBuildPipeServiceImpl if (app.isCustomCodeJob()) { // customCode upload jar to appHome... - String appHome = app.getAppHome(); FsOperator fsOperator = app.getFsOperator(); - fsOperator.delete(appHome); - if (app.isUploadJob()) { + if (app.isCICDJob()) { + String appHome = app.getAppHome(); + fsOperator.mkCleanDirs(appHome); + fsOperator.upload(app.getDistHome(), appHome); + } else { File localJar = new File(WebUtils.getAppTempDir(), app.getJar()); // upload jar copy to appHome String uploadJar = appUploads.concat("/").concat(app.getJar()); checkOrElseUploadJar(app.getFsOperator(), localJar, uploadJar, appUploads); - switch (app.getApplicationType()) { - case STREAMPARK_FLINK: - fsOperator.mkdirs(app.getAppLib()); - fsOperator.copy(uploadJar, app.getAppLib(), false, true); - break; - case APACHE_FLINK: - fsOperator.mkdirs(appHome); - fsOperator.copy(uploadJar, appHome, false, true); - break; - default: - throw new IllegalArgumentException( - "[StreamPark] unsupported ApplicationType of custom code: " - + app.getApplicationType()); + if (app.getApplicationType() == ApplicationType.STREAMPARK_FLINK) { + fsOperator.mkdirs(app.getAppLib()); + fsOperator.copy(uploadJar, app.getAppLib(), false, true); } - } else { - fsOperator.upload(app.getDistHome(), appHome); } - } else { + } + + if (app.isFlinkSqlJob() || app.isUploadJob()) { if (!app.getDependencyObject().getJar().isEmpty()) { String localUploads = Workspace.local().APP_UPLOADS(); // copy jar to local upload dir @@ -335,7 +327,8 @@ public class AppBuildPipeServiceImpl FlinkEnv flinkEnv = flinkEnvService.getByIdOrDefault(app.getVersionId()); String flinkUserJar = retrieveFlinkUserJar(flinkEnv, app); ExecutionMode executionMode = app.getExecutionModeEnum(); - String mainClass = ConfigConst.STREAMPARK_FLINKSQL_CLIENT_CLASS(); + String mainClass = + app.isCustomCodeJob() ? app.getMainClass() : ConfigConst.STREAMPARK_FLINKSQL_CLIENT_CLASS(); switch (executionMode) { case YARN_APPLICATION: String yarnProvidedPath = app.getAppLib(); @@ -358,13 +351,17 @@ public class AppBuildPipeServiceImpl case YARN_PER_JOB: case YARN_SESSION: case REMOTE: + boolean skipBuild = app.isCustomCodeJob(); + if (skipBuild && app.isUploadJob()) { + skipBuild = app.getDependencyObject().isEmpty(); + } FlinkRemotePerJobBuildRequest buildRequest = new FlinkRemotePerJobBuildRequest( app.getJobName(), app.getLocalAppHome(), mainClass, flinkUserJar, - app.isCustomCodeJob(), + skipBuild, app.getExecutionModeEnum(), app.getDevelopmentMode(), flinkEnv.getFlinkVersion(), @@ -424,7 +421,7 @@ public class AppBuildPipeServiceImpl case STREAMPARK_FLINK: return String.format("%s/%s", app.getAppLib(), app.getModule().concat(".jar")); case APACHE_FLINK: - return String.format("%s/%s", app.getAppHome(), app.getJar()); + return String.format("%s/%s", WebUtils.getAppTempDir(), app.getJar()); default: throw new IllegalArgumentException( "[StreamPark] unsupported ApplicationType of custom code: " diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index 2242b897e..4dc07ebab 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -703,6 +703,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli public boolean create(Application appParam) { ApiAlertException.throwIfNull( appParam.getTeamId(), "The teamId can't be null. Create application failed."); + appParam.setBuild(true); appParam.setUserId(commonService.getUserId()); appParam.setState(FlinkAppState.ADDED.getValue()); appParam.setRelease(ReleaseState.NEED_RELEASE.get()); @@ -736,6 +737,17 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli } } + @Override + public boolean save(Application entity) { + String dependency = entity.getDependency(); + if (entity.isFlinkSqlJob()) { + entity.setDependency(null); + } + boolean flag = super.save(entity); + entity.setDependency(dependency); + return flag; + } + private boolean existsByJobName(String jobName) { return this.baseMapper.existsByJobName(jobName); } @@ -837,7 +849,15 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli application.setRelease(ReleaseState.NEED_RELEASE.get()); if (application.isUploadJob()) { - if (!ObjectUtils.safeEquals(application.getJar(), appParam.getJar())) { + Application.Dependency thisDependency = + Application.Dependency.toDependency(appParam.getDependency()); + Application.Dependency targetDependency = + Application.Dependency.toDependency(application.getDependency()); + + boolean depDifference = !thisDependency.eq(targetDependency); + if (depDifference) { + application.setBuild(true); + } else if (!ObjectUtils.safeEquals(application.getJar(), appParam.getJar())) { application.setBuild(true); } else { File jarFile = new File(WebUtils.getAppTempDir(), appParam.getJar()); diff --git a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql index 77494579a..7a450f129 100644 --- a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql +++ b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql @@ -25,7 +25,7 @@ insert into `t_team` values (100001, 'test', 'The test team', now(), now()); -- ---------------------------- -- Records of t_flink_app -- ---------------------------- -insert into `t_flink_app` values (100000, 100000, 2, 4, null, null, 'Flink SQL Demo', null, null, null, null, null, null , null, 100000, null, 1, null, null, null, null, null, null, null, '0', 0, null, null, null, null, null, null, 'Flink SQL Demo', 0, null, null, null, null, null, null, null, 0, 0, now(), now(), null, 1, 1, null, null, null, null, null, null, 0, null, null, null, 'streampark,test'); +insert into `t_flink_app` values (100000, 100000, 2, 4, null, null, 'Flink SQL Demo', null, null, null, null, null, null, null , null, 100000, null, 1, null, null, null, null, null, null, null, '0', 0, null, null, null, null, null, null, 'Flink SQL Demo', 0, null, null, null, null, null, null, null, 0, 0, now(), now(), null, 1, 1, null, null, null, null, null, null, 0, null, null, null, 'streampark,test'); -- ---------------------------- -- Records of t_flink_effective diff --git a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql index 3f7025a83..d06f907e0 100644 --- a/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql +++ b/streampark-console/streampark-console-service/src/main/resources/db/schema-h2.sql @@ -46,6 +46,7 @@ create table if not exists `t_flink_app` ( `jar` varchar(255) default null, `jar_check_sum` bigint default null, `main_class` varchar(255) default null, + `dependency` text , `args` text, `options` text, `hot_params` text , diff --git a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml index a6c97121c..9096158b5 100644 --- a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml +++ b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml @@ -40,6 +40,7 @@ <result column="tracking" jdbcType="INTEGER" property="tracking"/> <result column="jar" jdbcType="VARCHAR" property="jar"/> <result column="jar_check_sum" jdbcType="VARCHAR" property="jarCheckSum"/> + <result column="dependency" jdbcType="LONGVARCHAR" property="dependency"/> <result column="main_class" jdbcType="VARCHAR" property="mainClass"/> <result column="job_id" jdbcType="VARCHAR" property="jobId"/> <result column="job_manager_url" jdbcType="VARCHAR" property="jobManagerUrl"/> diff --git a/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue b/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue index abf6c47ea..40b5c4964 100644 --- a/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue +++ b/streampark-console/streampark-console-webapp/src/components/Application/src/AppDarkModeToggle.vue @@ -79,9 +79,7 @@ height: 18px; background-color: #fff; border-radius: 50%; - transition: - transform 0.5s, - background-color 0.5s; + transition: transform 0.5s, background-color 0.5s; will-change: transform; } diff --git a/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue b/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue index 61cc99b7f..122b6e711 100644 --- a/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue +++ b/streampark-console/streampark-console-webapp/src/components/ContextMenu/src/ContextMenu.vue @@ -195,9 +195,7 @@ background-color: @component-background; border: 1px solid rgb(0 0 0 / 8%); border-radius: 0.25rem; - box-shadow: - 0 2px 2px 0 rgb(0 0 0 / 14%), - 0 3px 1px -2px rgb(0 0 0 / 10%), + box-shadow: 0 2px 2px 0 rgb(0 0 0 / 14%), 0 3px 1px -2px rgb(0 0 0 / 10%), 0 1px 5px 0 rgb(0 0 0 / 6%); background-clip: padding-box; user-select: none; diff --git a/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue b/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue index ab4c10638..0169b4c86 100644 --- a/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue +++ b/streampark-console/streampark-console-webapp/src/components/Form/src/BasicForm.vue @@ -129,7 +129,7 @@ }); const getBindValue = computed( - () => ({ ...attrs, ...props, ...unref(getProps) }) as Recordable, + () => ({ ...attrs, ...props, ...unref(getProps) } as Recordable), ); const getSchema = computed((): FormSchema[] => { diff --git a/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue b/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue index c499ccf65..153302646 100644 --- a/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue +++ b/streampark-console/streampark-console-webapp/src/components/Page/src/PageFooter.vue @@ -55,9 +55,7 @@ line-height: 44px; background-color: @component-background; border-top: 1px solid @border-color-base; - box-shadow: - 0 -6px 16px -8px rgb(0 0 0 / 8%), - 0 -9px 28px 0 rgb(0 0 0 / 5%), + box-shadow: 0 -6px 16px -8px rgb(0 0 0 / 8%), 0 -9px 28px 0 rgb(0 0 0 / 5%), 0 -12px 48px 16px rgb(0 0 0 / 3%); transition: width 0.2s; diff --git a/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts b/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts index 47aa6eb5f..de0380811 100644 --- a/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts +++ b/streampark-console/streampark-console-webapp/src/hooks/web/useLockPage.ts @@ -48,12 +48,9 @@ export function useLockPage() { } clear(); - timeId = setTimeout( - () => { - lockPage(); - }, - lockTime * 60 * 1000, - ); + timeId = setTimeout(() => { + lockPage(); + }, lockTime * 60 * 1000); } function lockPage(): void { diff --git a/streampark-console/streampark-console-webapp/src/utils/props.ts b/streampark-console/streampark-console-webapp/src/utils/props.ts index 5828c8cf7..92fe2810b 100644 --- a/streampark-console/streampark-console-webapp/src/utils/props.ts +++ b/streampark-console/streampark-console-webapp/src/utils/props.ts @@ -191,7 +191,7 @@ export const buildProps = < : never; }; -export const definePropType = <T>(val: any) => ({ [wrapperKey]: val }) as PropWrapper<T>; +export const definePropType = <T>(val: any) => ({ [wrapperKey]: val } as PropWrapper<T>); export const keyOf = <T extends Object>(arr: T) => Object.keys(arr) as Array<keyof T>; export const mutable = <T extends readonly any[] | Record<string, unknown>>(val: T) => diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue index 541503f69..071231e0c 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/Add.vue @@ -185,7 +185,7 @@ // common params... const resourceFrom = values.resourceFrom; if (resourceFrom) { - if (resourceFrom === 'csv') { + if (resourceFrom === 'cvs') { params['resourceFrom'] = ResourceFromEnum.CICD; //streampark flink if (values.appType == AppTypeEnum.STREAMPARK_FLINK) { @@ -210,15 +210,15 @@ appType: AppTypeEnum.APACHE_FLINK, jar: unref(uploadJar), mainClass: values.mainClass, + dependency: await getDependency(), }); handleCreateApp(params); } } } - /* flink sql mode */ - async function handleSubmitSQL(values: Recordable) { + async function getDependency() { // Trigger a pom confirmation operation. - await unref(dependencyRef)?.handleApplyPom(); + unref(dependencyRef)?.handleApplyPom(); // common params... const dependency: { pom?: string; jar?: string } = {}; const dependencyRecords = unref(dependencyRef)?.dependencyRecords; @@ -233,14 +233,18 @@ jar: unref(uploadJars), }); } - + return dependency.pom === undefined && dependency.jar === undefined + ? null + : JSON.stringify(dependency); + } + /* flink sql mode */ + async function handleSubmitSQL(values: Recordable) { let config = values.configOverride; - if (config != null && config !== undefined && config.trim() != '') { + if (config != null && config.trim() != '') { config = encryptByBase64(config); } else { config = null; } - handleCluster(values); const params = { jobType: JobTypeEnum.SQL, @@ -248,10 +252,7 @@ appType: AppTypeEnum.STREAMPARK_FLINK, config, format: values.isSetConfig ? 1 : null, - dependency: - dependency.pom === undefined && dependency.jar === undefined - ? null - : JSON.stringify(dependency), + dependency: await getDependency(), }; handleSubmitParams(params, values, k8sTemplate); handleCreateApp(params); @@ -285,7 +286,7 @@ const param = {}; for (const k in params) { const v = params[k]; - if (v != null && v !== undefined) { + if (v != null) { param[k] = v; } } diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue index 021354020..265c66e52 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/EditFlink.vue @@ -39,6 +39,7 @@ import VariableReview from './components/VariableReview.vue'; import { useDrawer } from '/@/components/Drawer'; import { ExecModeEnum, ResourceFromEnum } from '/@/enums/flinkEnum'; + import Dependency from '/@/views/flink/app/components/Dependency.vue'; const route = useRoute(); const { t } = useI18n(); @@ -52,6 +53,7 @@ const uploadJar = ref(''); const programArgRef = ref(); const podTemplateRef = ref(); + const dependencyRef = ref(); const k8sTemplate = reactive({ podTemplate: '', @@ -116,6 +118,7 @@ setFieldsValue(defaultParams); app.args && programArgRef.value?.setContent(app.args); setTimeout(() => { + unref(dependencyRef)?.setDefaultValue(JSON.parse(app.dependency || '{}')); unref(podTemplateRef)?.handleChoicePodTemplate('ptVisual', app.k8sPodTemplate); unref(podTemplateRef)?.handleChoicePodTemplate('jmPtVisual', app.k8sJmPodTemplate); unref(podTemplateRef)?.handleChoicePodTemplate('tmPtVisual', app.k8sTmPodTemplate); @@ -142,15 +145,34 @@ /* Handling update parameters */ function handleAppUpdate(values: Recordable) { + // Trigger a pom confirmation operation. + unref(dependencyRef)?.handleApplyPom(); + // common params... + const dependency: { pom?: string; jar?: string } = {}; + const dependencyRecords = unref(dependencyRef)?.dependencyRecords; + const uploadJars = unref(dependencyRef)?.uploadJars; + if (unref(dependencyRecords) && unref(dependencyRecords).length > 0) { + Object.assign(dependency, { + pom: unref(dependencyRecords), + }); + } + if (uploadJars && unref(uploadJars).length > 0) { + Object.assign(dependency, { + jar: unref(uploadJars), + }); + } submitLoading.value = true; try { const params = { id: app.id, jar: values.jar, mainClass: values.mainClass, + dependency: + dependency.pom === undefined && dependency.jar === undefined + ? null + : JSON.stringify(dependency), }; handleSubmitParams(params, values, k8sTemplate); - handleUpdateApp(params); } catch (error) { submitLoading.value = false; @@ -212,13 +234,17 @@ <template #args="{ model }"> <ProgramArgs ref="programArgRef" - v-if="model.args != null && model.args != undefined" + v-if="model.args != null && true" v-model:value="model.args" :suggestions="suggestions" @preview="(value) => openReviewDrawer(true, { value, suggestions })" /> </template> + <template #dependency="{ model, field }"> + <Dependency ref="dependencyRef" v-model:value="model[field]" :form-model="model" /> + </template> + <template #formFooter> <div class="flex items-center w-full justify-center"> <a-button @click="go('/flink/app')"> diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue index 0d346aa10..9e307ea24 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/EditStreamPark.vue @@ -360,7 +360,7 @@ <template #args="{ model }"> <ProgramArgs ref="programArgRef" - v-if="model.args != null && model.args != undefined" + v-if="model.args != null" v-model:value="model.args" :suggestions="suggestions" @preview="(value) => openReviewDrawer(true, { value, suggestions })" diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts index a5cad757c..752ad30e0 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateAndEditSchema.ts @@ -49,7 +49,13 @@ import { fetchFlinkEnv } from '/@/api/flink/setting/flinkEnv'; import { FlinkEnv } from '/@/api/flink/setting/types/flinkEnv.type'; import { AlertSetting } from '/@/api/flink/setting/types/alert.type'; import { FlinkCluster } from '/@/api/flink/setting/types/flinkCluster.type'; -import { AppTypeEnum, ClusterStateEnum, ExecModeEnum, JobTypeEnum } from '/@/enums/flinkEnum'; +import { + AppTypeEnum, + ClusterStateEnum, + ExecModeEnum, + JobTypeEnum, + ResourceFromEnum, +} from '/@/enums/flinkEnum'; import { isK8sExecMode } from '../utils'; import { useI18n } from '/@/hooks/web/useI18n'; import { fetchCheckHadoop } from '/@/api/flink/setting'; @@ -79,7 +85,7 @@ export const useCreateAndEditSchema = ( const [registerConfDrawer, { openDrawer: openConfDrawer }] = useDrawer(); - /* + /* !The original item is also unassigned */ function getConfigSchemas() { @@ -126,9 +132,11 @@ export const useCreateAndEditSchema = ( slot: 'dependency', ifShow: ({ values }) => { if (edit?.appId) { - return values.jobType == JobTypeEnum.SQL; + return values.jobType == JobTypeEnum.SQL + ? true + : values.resourceFrom == ResourceFromEnum.UPLOAD; } else { - return values?.jobType == 'sql'; + return values?.jobType == 'sql' ? true : values?.resourceFrom != 'cvs'; } }, }, diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts index 72b11762f..4ef2557b3 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useCreateSchema.ts @@ -63,14 +63,6 @@ export const useCreateSchema = (dependencyRef: Ref) => { suggestions, } = useCreateAndEditSchema(dependencyRef); - // async function handleEditConfig(config: string) { - // console.log('config', config); - // const res = await fetchAppConf({ config }); - // const conf = decodeByBase64(res); - // openConfDrawer(true, { - // configOverride: conf, - // }); - // } function handleCheckConfig(_rule: RuleObject, value: StoreValue) { if (value) { const confType = getAppConfType(value); @@ -97,7 +89,7 @@ export const useCreateSchema = (dependencyRef: Ref) => { if (value === 'sql') { formModel.tableEnv = 1; } else { - formModel.resourceFrom = 'csv'; + formModel.resourceFrom = 'cvs'; } }, }; @@ -109,7 +101,6 @@ export const useCreateSchema = (dependencyRef: Ref) => { }, ...getExecutionModeSchema.value, ...getFlinkClusterSchemas.value, - ...getFlinkSqlSchema.value, { field: 'resourceFrom', label: t('flink.app.resourceFrom'), @@ -133,6 +124,7 @@ export const useCreateSchema = (dependencyRef: Ref) => { ifShow: ({ values }) => values?.jobType !== 'sql' && values?.resourceFrom == 'upload', rules: [{ required: true, message: t('flink.app.addAppTips.mainClassIsRequiredMessage') }], }, + ...getFlinkSqlSchema.value, { field: 'project', label: t('flink.app.project'), diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useEditFlinkSchema.ts b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useEditFlinkSchema.ts index aef6f6f81..c7e072ad8 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useEditFlinkSchema.ts +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useEditFlinkSchema.ts @@ -23,6 +23,8 @@ import { Alert } from 'ant-design-vue'; import { useRoute } from 'vue-router'; import { fetchMain } from '/@/api/flink/app/app'; import { ResourceFromEnum } from '/@/enums/flinkEnum'; +import { useI18n } from '/@/hooks/web/useI18n'; +const { t } = useI18n(); export const useEditFlinkSchema = (jars: Ref) => { const flinkSql = ref(); @@ -115,6 +117,12 @@ export const useEditFlinkSchema = (jars: Ref) => { }, rules: [{ required: true, message: 'Program Main is required' }], }, + { + field: 'dependency', + label: t('flink.app.dependency'), + component: 'Input', + slot: 'dependency', + }, ...getFlinkFormOtherSchemas.value, ]; }); diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx index 4a103f81b..ba00e667a 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useFlinkRender.tsx @@ -514,7 +514,7 @@ export const renderResourceFrom = (model: Recordable) => { value={model.resourceFrom} placeholder="Please select resource from" > - <Select.Option value="csv"> + <Select.Option value="cvs"> <SvgIcon name="github" /> <span class="pl-10px">CICD</span> <span class="gray">(build from CVS)</span> diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala index 9c95748dd..5dab91892 100644 --- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala +++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala @@ -46,7 +46,7 @@ import java.util import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer -import scala.util.Try +import scala.util.{Failure, Success, Try} object MavenTool extends Logger { @@ -76,9 +76,6 @@ object MavenTool extends Logger { List(remoteRepository) } - private val isJarFile = (file: File) => - file.isFile && Try(Utils.checkJarFile(file.toURI.toURL)).isSuccess - /** * Build a fat-jar with custom jar libraries. * @@ -102,16 +99,34 @@ object MavenTool extends Logger { uberJar.delete() // resolve all jarLibs val jarSet = new util.HashSet[File] - jarLibs - .map(lib => new File(lib)) - .filter(_.exists) - .foreach { - case libFile if isJarFile(libFile) => jarSet.add(libFile) - case libFile if libFile.isDirectory => - libFile.listFiles.filter(isJarFile).foreach(jarSet.add) - case _ => - } - logInfo(s"start shaded fat-jar: ${jarLibs.mkString(",")}") + jarLibs.foreach { + x => + new File(x) match { + case jarFile if jarFile.exists() => + if (jarFile.isFile) { + Try(Utils.checkJarFile(jarFile.toURI.toURL)) match { + case Success(_) => jarSet.add(jarFile) + case Failure(e) => logWarn(s"buildFatJar: error, ${e.getMessage}") + } + } else { + jarFile.listFiles.foreach( + jar => { + if (jar.isFile) { + Try(Utils.checkJarFile(jar.toURI.toURL)) match { + case Success(_) => jarSet.add(jar) + case Failure(e) => + logWarn( + s"buildFatJar: directory [${jarFile.getAbsolutePath}], error: ${e.getMessage}") + } + } + }) + } + case _ => + } + } + + logInfo(s"start shaded fat-jar: ${jarSet.mkString(",")}") + // shade jars val shadeRequest = { val req = new ShadeRequest
