This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
new 92587aae2 [Improve] Flink job status monitoring improvement (#3546)
92587aae2 is described below
commit 92587aae2aaaeb6148e7c986887b4b0e60cd3714
Author: benjobs <[email protected]>
AuthorDate: Mon Feb 12 18:27:59 2024 +0800
[Improve] Flink job status monitoring improvement (#3546)
Co-authored-by: benjobs <[email protected]>
---
.../main/assembly/script/upgrade/mysql/2.1.3.sql | 1 +
.../ApplicationBuildPipelineController.java | 2 ++
.../core/service/impl/ApplicationServiceImpl.java | 12 ++++++++----
.../console/core/task/FlinkAppHttpWatcher.java | 21 ++++++++++++---------
.../src/main/resources/db/data-h2.sql | 1 -
.../src/locales/lang/zh-CN/flink/app.ts | 6 +++---
.../src/locales/lang/zh-CN/menu.ts | 2 +-
.../src/views/flink/app/View.vue | 5 ++++-
.../src/views/flink/app/hooks/useAppTableAction.ts | 12 ++++++------
9 files changed, 37 insertions(+), 25 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.3.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.3.sql
index 176a32509..cb6a6bee9 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.3.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.3.sql
@@ -23,6 +23,7 @@ set foreign_key_checks = 0;
update `t_menu` set menu_name='Apache Flink',order_num=1 where menu_id =
120000;
update `t_menu` set order_num=3 where menu_id = 110000;
update `t_menu` set order_num=2 where menu_id = 130000;
+delete from `t_menu` where menu_id=110300;
alter table `t_flink_app`
modify column `args` longtext,
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
index 65352306f..154aca57d 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationBuildPipelineController.java
@@ -98,6 +98,8 @@ public class ApplicationBuildPipelineController {
public RestResponse buildApplication(Long appId, boolean forceBuild) throws
Exception {
Application app = applicationService.getById(appId);
+ ApiAlertException.throwIfNull(
+ app.getVersionId(), "Please bind a Flink version to the current flink
job.");
// 1) check flink version
FlinkEnv env = flinkEnvService.getById(app.getVersionId());
boolean checkVersion = env.getFlinkVersion().checkVersion(false);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 292ac3db9..b63194c5a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -505,7 +505,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
return null;
}
Page<Application> page = MybatisPager.getPage(request);
- if (CommonUtils.notEmpty(appParam.getStateArray())) {
+ if (CommonUtils.notEmpty((Object) appParam.getStateArray())) {
if (Arrays.stream(appParam.getStateArray())
.anyMatch(x -> x == FlinkAppState.FINISHED.getValue())) {
Integer[] newArray =
@@ -529,9 +529,13 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
.peek(
record -> {
// 1) running Duration
- if (record.getTracking() == 1
- && record.getFlinkAppStateEnum() ==
FlinkAppState.RUNNING) {
- record.setDuration(now - record.getStartTime().getTime());
+ if (record.getTracking() == 1) {
+ FlinkAppState state = record.getFlinkAppStateEnum();
+ if (state == FlinkAppState.RUNNING
+ || state == FlinkAppState.CANCELLING
+ || state == FlinkAppState.MAPPING) {
+ record.setDuration(now -
record.getStartTime().getTime());
+ }
}
// 2) k8s restURL
if (record.isKubernetesModeJob()) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
index f05cb2afc..226c3a561 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
@@ -820,25 +820,27 @@ public class FlinkAppHttpWatcher {
private String jobId;
private FlinkAppState appState;
private OptionState optionState;
+ private String jobManagerUrl;
@Override
- public boolean equals(Object o) {
- if (this == o) {
+ public boolean equals(Object object) {
+ if (this == object) {
return true;
}
- if (o == null || getClass() != o.getClass()) {
+ if (object == null || getClass() != object.getClass()) {
return false;
}
- StateChangeEvent event = (StateChangeEvent) o;
- return Objects.equals(id, event.id)
- && Objects.equals(jobId, event.jobId)
- && appState == event.appState
- && optionState == event.optionState;
+ StateChangeEvent that = (StateChangeEvent) object;
+ return Objects.equals(id, that.id)
+ && Objects.equals(jobId, that.jobId)
+ && appState == that.appState
+ && optionState == that.optionState
+ && Objects.equals(jobManagerUrl, that.jobManagerUrl);
}
@Override
public int hashCode() {
- return Objects.hash(id, jobId, appState, optionState);
+ return Objects.hash(id, jobId, appState, optionState, jobManagerUrl);
}
public static StateChangeEvent of(Application application) {
@@ -847,6 +849,7 @@ public class FlinkAppHttpWatcher {
event.setOptionState(OptionState.of(application.getOptionState()));
event.setAppState(application.getFlinkAppStateEnum());
event.setJobId(application.getJobId());
+ event.setJobManagerUrl(application.getJobManagerUrl());
return event;
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
index 9a8bd45cf..2067af7da 100644
---
a/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
+++
b/streampark-console/streampark-console-service/src/main/resources/db/data-h2.sql
@@ -50,7 +50,6 @@ insert into `t_menu` values (120000, 0, 'Apache Flink',
'/flink', 'PageView', nu
insert into `t_menu` values (130000, 0, 'menu.setting', '/setting',
'PageView', null, null, '0', 1, 2, now(), now());
insert into `t_menu` values (110100, 110000, 'menu.userManagement',
'/system/user', 'system/user/User', null, 'user', '0', 1, 1, now(), now());
insert into `t_menu` values (110200, 110000, 'menu.roleManagement',
'/system/role', 'system/role/Role', null, 'smile', '0', 1, 2, now(), now());
-insert into `t_menu` values (110300, 110000, 'menu.menuManagement',
'/system/menu', 'system/menu/Menu', 'menu:view', 'bars', '0', 1, 3, now(),
now());
insert into `t_menu` values (110400, 110000, 'menu.tokenManagement',
'/system/token', 'system/token/Token', null, 'lock', '0', 1, 1, now(), now());
insert into `t_menu` values (110500, 110000, 'menu.teamManagement',
'/system/team', 'system/team/Team', null, 'team', '0', 1, 2, now(), now());
insert into `t_menu` values (110600, 110000, 'menu.memberManagement',
'/system/member', 'system/member/Member', null, 'usergroup-add', '0', 1, 2,
now(), now());
diff --git
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
index 8e81ba93e..633cfe682 100644
---
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
+++
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
@@ -40,8 +40,8 @@ export default {
jmMemoryOptions: 'JM内存',
tmMemoryOptions: 'TM内存',
podTemplate: 'Kubernetes Pod 模板',
- flinkCluster: 'Flink集群',
- yarnQueue: 'Yarn队列',
+ flinkCluster: 'Flink 集群',
+ yarnQueue: 'Yarn 队列',
mavenPom: 'maven pom',
uploadJar: '上传依赖Jar文件',
kubernetesNamespace: 'K8S命名空间',
@@ -265,7 +265,7 @@ export default {
appNameK8sClusterIdRoleRegexp:
'只能由小写字母、数字、字符、和"-" 组成,必须满足正则格式 [a-z]([-a-z0-9]*[a-z0-9])',
appNameRoleContent: '字符必须是(中文 或 英文 或 "-" 或 "_"),不能出现两个连续的空格',
- flinkClusterIsRequiredMessage: 'Flink集群必填',
+ flinkClusterIsRequiredMessage: 'Flink 集群必填',
flinkSqlIsRequiredMessage: 'Flink SQL必填',
tagsPlaceholder: '请输入标签,如果超过一个,用逗号(,)分隔',
parallelismPlaceholder: '运行程序的并行度',
diff --git
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
index b3b8f3183..2bb1875e6 100644
---
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
+++
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
@@ -32,7 +32,7 @@ export default {
system: '系统设置',
alarm: '告警设置',
flinkHome: 'Flink 版本',
- flinkCluster: 'Flink集群',
+ flinkCluster: 'Flink 集群',
externalLink: '扩展链接',
yarnQueue: 'Yarn 队列',
},
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue
index 0af94b3bb..399b56425 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/app/View.vue
@@ -65,6 +65,7 @@
});
const appDashboardRef = ref<any>();
+ const noData = ref<boolean>();
const yarn = ref<Nullable<string>>(null);
const currentTablePage = ref(1);
@@ -91,6 +92,7 @@
},
afterFetch: (dataSource) => {
const timestamp = new Date().getTime();
+ noData.value = dataSource.length == 0;
dataSource.forEach((x) => {
x.expanded = [
{
@@ -333,8 +335,9 @@
</template>
<template #insertTable="{ tableContainer }">
<AppTableResize
+ v-if="!noData"
:table-container="tableContainer"
- :resize-min="100"
+ :resize-min="200"
v-model:left="tableColumnWidth.jobName"
/>
</template>
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts
index dc89bf451..301be896d 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/hooks/useAppTableAction.ts
@@ -64,12 +64,6 @@ export const useAppTableAction = (
/* Operation button list */
function getActionList(record: AppListRecord, currentPageNo: number):
ActionItem[] {
return [
- {
- tooltip: { title: t('flink.app.operation.edit') },
- auth: 'app:update',
- icon: 'clarity:note-edit-line',
- onClick: handleEdit.bind(null, record, currentPageNo),
- },
{
tooltip: { title: t('flink.app.operation.release') },
ifShow:
@@ -106,6 +100,12 @@ export const useAppTableAction = (
icon: 'ant-design:pause-circle-outlined',
onClick: handleCancel.bind(null, record),
},
+ {
+ tooltip: { title: t('flink.app.operation.edit') },
+ auth: 'app:update',
+ icon: 'clarity:note-edit-line',
+ onClick: handleEdit.bind(null, record, currentPageNo),
+ },
{
tooltip: { title: t('flink.app.operation.detail') },
auth: 'app:detail',