This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch job_state in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 8a263348727c240eae6eeb2cebfe90ed905c05f9 Author: benjobs <[email protected]> AuthorDate: Mon Feb 12 14:33:57 2024 +0800 [Improve] Flink job status monitoring improvement --- .../main/assembly/script/upgrade/mysql/2.1.3.sql | 1 + .../ApplicationBuildPipelineController.java | 2 ++ .../core/service/impl/ApplicationServiceImpl.java | 12 ++++++++---- .../console/core/task/FlinkAppHttpWatcher.java | 21 ++++++++++++--------- .../src/main/resources/db/data-h2.sql | 1 - .../src/locales/lang/zh-CN/flink/app.ts | 6 +++--- .../src/locales/lang/zh-CN/menu.ts | 2 +- .../src/views/flink/app/View.vue | 5 ++++- .../src/views/flink/app/hooks/useAppTableAction.ts | 12 ++++++------ 9 files changed, 37 insertions(+), 25 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.3.sql b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.3.sql index 176a32509..cb6a6bee9 100644 --- a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.3.sql +++ b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.3.sql @@ -23,6 +23,7 @@ set foreign_key_checks = 0; update `t_menu` set menu_name='Apache Flink',order_num=1 where menu_id = 120000; update `t_menu` set order_num=3 where menu_id = 110000; update `t_menu` set order_num=2 where menu_id = 130000; +delete from `t_menu` where menu_id=110300; alter table `t_flink_app` modify column `args` longtext, diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java index 65352306f..154aca57d 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java @@ -98,6 +98,8 @@ public class ApplicationBuildPipelineController { public RestResponse buildApplication(Long appId, boolean forceBuild) throws Exception { Application app = applicationService.getById(appId); + ApiAlertException.throwIfNull( + app.getVersionId(), "Please bind a Flink version to the current flink job."); // 1) check flink version FlinkEnv env = flinkEnvService.getById(app.getVersionId()); boolean checkVersion = env.getFlinkVersion().checkVersion(false); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index 292ac3db9..b63194c5a 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -505,7 +505,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli return null; } Page<Application> page = MybatisPager.getPage(request); - if (CommonUtils.notEmpty(appParam.getStateArray())) { + if (CommonUtils.notEmpty((Object) appParam.getStateArray())) { if (Arrays.stream(appParam.getStateArray()) .anyMatch(x -> x == FlinkAppState.FINISHED.getValue())) { Integer[] newArray = @@ -529,9 +529,13 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli .peek( record -> { // 1) running Duration - if (record.getTracking() == 1 - && record.getFlinkAppStateEnum() == FlinkAppState.RUNNING) { - record.setDuration(now - record.getStartTime().getTime()); + if (record.getTracking() == 1) { + FlinkAppState state = record.getFlinkAppStateEnum(); + if (state == FlinkAppState.RUNNING + || state == FlinkAppState.CANCELLING + || state == FlinkAppState.MAPPING) { + record.setDuration(now - record.getStartTime().getTime()); + } } // 2) k8s restURL if (record.isKubernetesModeJob()) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java index f05cb2afc..226c3a561 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java @@ -820,25 +820,27 @@ public class FlinkAppHttpWatcher { private String jobId; private FlinkAppState appState; private OptionState optionState; + private String jobManagerUrl; @Override - public boolean equals(Object o) { - if (this == o) { + public boolean equals(Object object) { + if (this == object) { return true; } - if (o == null || getClass() != o.getClass()) { + if (object == null || getClass() != object.getClass()) { return false; } - StateChangeEvent event = (StateChangeEvent) o; - return Objects.equals(id, event.id) - && Objects.equals(jobId, event.jobId) - && appState == event.appState - && optionState == event.optionState; + StateChangeEvent that = (StateChangeEvent) object; + return Objects.equals(id, that.id) + && Objects.equals(jobId, that.jobId) + && appState == that.appState + && optionState == that.optionState + && Objects.equals(jobManagerUrl, that.jobManagerUrl); } @Override public int hashCode() { - return Objects.hash(id, jobId, appState, optionState); + return Objects.hash(id, jobId, appState, optionState, jobManagerUrl); } public static StateChangeEvent of(Application application) { @@ -847,6 +849,7 @@ public class FlinkAppHttpWatcher { event.setOptionState(OptionState.of(application.getOptionState())); event.setAppState(application.getFlinkAppStateEnum()); event.setJobId(application.getJobId()); + event.setJobManagerUrl(application.getJobManagerUrl()); return event; } } diff --git a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql index 9a8bd45cf..2067af7da 100644 --- a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql +++ b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql @@ -50,7 +50,6 @@ insert into `t_menu` values (120000, 0, 'Apache Flink', '/flink', 'PageView', nu insert into `t_menu` values (130000, 0, 'menu.setting', '/setting', 'PageView', null, null, '0', 1, 2, now(), now()); insert into `t_menu` values (110100, 110000, 'menu.userManagement', '/system/user', 'system/user/User', null, 'user', '0', 1, 1, now(), now()); insert into `t_menu` values (110200, 110000, 'menu.roleManagement', '/system/role', 'system/role/Role', null, 'smile', '0', 1, 2, now(), now()); -insert into `t_menu` values (110300, 110000, 'menu.menuManagement', '/system/menu', 'system/menu/Menu', 'menu:view', 'bars', '0', 1, 3, now(), now()); insert into `t_menu` values (110400, 110000, 'menu.tokenManagement', '/system/token', 'system/token/Token', null, 'lock', '0', 1, 1, now(), now()); insert into `t_menu` values (110500, 110000, 'menu.teamManagement', '/system/team', 'system/team/Team', null, 'team', '0', 1, 2, now(), now()); insert into `t_menu` values (110600, 110000, 'menu.memberManagement', '/system/member', 'system/member/Member', null, 'usergroup-add', '0', 1, 2, now(), now()); diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts index 8e81ba93e..633cfe682 100644 --- a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts +++ b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts @@ -40,8 +40,8 @@ export default { jmMemoryOptions: 'JM内存', tmMemoryOptions: 'TM内存', podTemplate: 'Kubernetes Pod 模板', - flinkCluster: 'Flink集群', - yarnQueue: 'Yarn队列', + flinkCluster: 'Flink 集群', + yarnQueue: 'Yarn 队列', mavenPom: 'maven pom', uploadJar: '上传依赖Jar文件', kubernetesNamespace: 'K8S命名空间', @@ -265,7 +265,7 @@ export default { appNameK8sClusterIdRoleRegexp: '只能由小写字母、数字、字符、和"-" 组成,必须满足正则格式 [a-z]([-a-z0-9]*[a-z0-9])', appNameRoleContent: '字符必须是(中文 或 英文 或 "-" 或 "_"),不能出现两个连续的空格', - flinkClusterIsRequiredMessage: 'Flink集群必填', + flinkClusterIsRequiredMessage: 'Flink 集群必填', flinkSqlIsRequiredMessage: 'Flink SQL必填', tagsPlaceholder: '请输入标签,如果超过一个,用逗号(,)分隔', parallelismPlaceholder: '运行程序的并行度', diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts index b3b8f3183..2bb1875e6 100644 --- a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts +++ b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts @@ -32,7 +32,7 @@ export default { system: '系统设置', alarm: '告警设置', flinkHome: 'Flink 版本', - flinkCluster: 'Flink集群', + flinkCluster: 'Flink 集群', externalLink: '扩展链接', yarnQueue: 'Yarn 队列', }, diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue index 0af94b3bb..399b56425 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue @@ -65,6 +65,7 @@ }); const appDashboardRef = ref<any>(); + const noData = ref<boolean>(); const yarn = ref<Nullable<string>>(null); const currentTablePage = ref(1); @@ -91,6 +92,7 @@ }, afterFetch: (dataSource) => { const timestamp = new Date().getTime(); + noData.value = dataSource.length == 0; dataSource.forEach((x) => { x.expanded = [ { @@ -333,8 +335,9 @@ </template> <template #insertTable="{ tableContainer }"> <AppTableResize + v-if="!noData" :table-container="tableContainer" - :resize-min="100" + :resize-min="200" v-model:left="tableColumnWidth.jobName" /> </template> diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts index dc89bf451..301be896d 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts @@ -64,12 +64,6 @@ export const useAppTableAction = ( /* Operation button list */ function getActionList(record: AppListRecord, currentPageNo: number): ActionItem[] { return [ - { - tooltip: { title: t('flink.app.operation.edit') }, - auth: 'app:update', - icon: 'clarity:note-edit-line', - onClick: handleEdit.bind(null, record, currentPageNo), - }, { tooltip: { title: t('flink.app.operation.release') }, ifShow: @@ -106,6 +100,12 @@ export const useAppTableAction = ( icon: 'ant-design:pause-circle-outlined', onClick: handleCancel.bind(null, record), }, + { + tooltip: { title: t('flink.app.operation.edit') }, + auth: 'app:update', + icon: 'clarity:note-edit-line', + onClick: handleEdit.bind(null, record, currentPageNo), + }, { tooltip: { title: t('flink.app.operation.detail') }, auth: 'app:detail',
