This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch doris-connector in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 34990ba6ded8a2692fe65638f07df1dea3dae995 Author: benjobs <[email protected]> AuthorDate: Sun Jan 28 12:21:52 2024 +0800 [Improve] FE i18n improvement --- .../core/service/impl/ApplicationServiceImpl.java | 1 + .../core/task/FlinkK8sChangeEventListener.java | 5 +- .../src/locales/lang/en/flink/app.ts | 6 ++ .../src/locales/lang/zh-CN/flink/app.ts | 5 ++ .../flink/app/components/AppDetail/DetailTab.vue | 96 +++++++++++----------- .../components/AppView/StartApplicationModal.vue | 23 ++---- .../kubernetes/watcher/FlinkMetricsWatcher.scala | 39 +++++---- 7 files changed, 88 insertions(+), 87 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index f15a48d22..6c1fc29da 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -247,6 +247,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli if (!teamId.equals(app.getTeamId())) { continue; } + // 1) only yarn-application, yarn-perjob mode if (app.getJmMemory() != null) { totalJmMemory += app.getJmMemory(); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java index a9e9962f9..9cc077718 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java @@ -84,15 +84,16 @@ public class FlinkK8sChangeEventListener { TrackId trackId = event.trackId(); // get pre application record Application app = applicationService.getById(trackId.appId()); - if (app == null) { + if (app == null || FlinkAppState.isEndState(app.getState())) { return; } + // update application record setByJobStatusCV(app, jobStatus); applicationService.persistMetrics(app); - // email alerts when necessary FlinkAppState state = FlinkAppState.of(app.getState()); + // email alerts when necessary if (FlinkAppState.FAILED.equals(state) || FlinkAppState.LOST.equals(state) || FlinkAppState.RESTARTING.equals(state) diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts index 366fef9d7..cd5a17f8e 100644 --- a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts +++ b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts @@ -166,6 +166,12 @@ export default { start: 'Start Application', stop: 'Stop application', savepoint: 'Trigger Savepoint', + fromSavepoint: 'From savepoint', + savepointTip: 'Restore the job from savepoint or latest checkpoint', + savepointInput: + 'Select or manually specify the savepoint/checkpoint path, Same as:-allowNonRestoredState(-n)', + ignoreRestored: 'Ignore failed', + ignoreRestoredTip: 'ignore savepoint then cannot be restored,', recheck: 'the associated project has changed and this job need to be rechecked', changed: 'the application has changed.', }, diff --git a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts index 863a909b7..d1f6b79f8 100644 --- a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts +++ b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts @@ -158,6 +158,11 @@ export default { savepoint: '触发 Savepoint', recheck: '关联的项目已更改,需要重新检查此作业', changed: '应用程序已更改。', + fromSavepoint: 'Savepoint 恢复', + savepointTip: '作业从 savepoint 或 checkpoint 恢复状态', + savepointInput: '选择或者手动指定 savepoint/checkpoint 路径', + ignoreRestored: '跳过恢复失败', + ignoreRestoredTip: '当状态恢复失败时跳过错误,作业继续运行, 同参数:-allowNonRestoredState(-n)', }, pod: { choice: '选择', diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue index 0d7fa96a2..dc8853e5c 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue @@ -246,13 +246,13 @@ /* delete configuration */ async function handleDeleteConf(record: Recordable) { await fetchRemoveConf({ id: record.id }); - reloadConf(); + await reloadConf(); } /* delete flink sql */ async function handleDeleteFlinkSql(record: Recordable) { await fetchRemoveFlinkSql({ id: record.id }); - reloadFlinkSql(); + await reloadFlinkSql(); } function handleCompare(record: Recordable) { @@ -345,17 +345,17 @@ /* delete savePoint */ async function handleDeleteSavePoint(record: Recordable) { await fetchRemoveSavePoint({ id: record.id }); - reloadSavePoint(); + await reloadSavePoint(); } async function handleDeleteBackup(record: Recordable) { await fetchRemoveBackup(record.id); - reloadBackup(); + await reloadBackup(); } async function handleDeleteOperationLog(record: Recordable) { await fetchDeleteOperationLog(record.id); - reloadOperationLog(); + await reloadOperationLog(); } /* copy path */ @@ -377,8 +377,43 @@ </script> <template> <div> - <Tabs :defaultActiveKey="1" class="mt-15px" :animated="false" :tab-bar-gutter="0"> - <TabPane key="1" tab="Option" force-render> + <Tabs :defaultActiveKey="1" class="mt-15px" :tab-bar-gutter="0"> + <TabPane key="1" :tab="t('flink.app.detail.detailTab.detailTabName.operationLog')"> + <BasicTable @register="registerLogsTable"> + <template #bodyCell="{ column, record }"> + <template v-if="column.dataIndex === 'optionName'"> + <Tag color="blue" v-if="record.optionName === OperationEnum.RELEASE"> Release </Tag> + <Tag color="green" v-if="record.optionName === OperationEnum.START"> Start </Tag> + <Tag color="cyan" v-if="record.optionName === OperationEnum.SAVEPOINT"> + Savepoint + </Tag> + <Tag color="orange" v-if="record.optionName === OperationEnum.CANCEL"> Cancel </Tag> + </template> + <template v-if="column.dataIndex === 'yarnAppId'"> + <a type="link" @click="handleYarnUrl(record.yarnAppId)" target="_blank"> + {{ record.yarnAppId }} + </a> + </template> + <template v-if="column.dataIndex === 'jobManagerUrl'"> + <a type="link" :href="record.jobManagerUrl" target="_blank"> + {{ record.jobManagerUrl }} + </a> + </template> + <template v-if="column.dataIndex === 'optionTime'"> + <Icon icon="ant-design:clock-circle-outlined" /> + {{ record.optionTime }} + </template> + <template v-if="column.dataIndex === 'success'"> + <Tag class="bold-tag" color="#52c41a" v-if="record.success"> SUCCESS </Tag> + <Tag class="bold-tag" color="#f5222d" v-else> FAILED </Tag> + </template> + <template v-if="column.dataIndex === 'operation'"> + <TableAction :actions="getOperationLogAction(record)" /> + </template> + </template> + </BasicTable> + </TabPane> + <TabPane key="2" :tab="t('flink.app.detail.detailTab.detailTabName.option')"> <Descriptions bordered size="middle" layout="vertical"> <DescriptionItem v-for="(v, k) in JSON.parse(app.options || '{}')" :key="k" :label="k"> {{ v }} @@ -386,7 +421,7 @@ </Descriptions> </TabPane> <TabPane - key="2" + key="3" :tab="t('flink.app.detail.detailTab.detailTabName.configuration')" v-if="app && app.appType === AppTypeEnum.STREAMPARK_FLINK && tabConf.showConf" > @@ -424,7 +459,7 @@ </BasicTable> </TabPane> <TabPane - key="3" + key="4" :tab="t('flink.app.detail.detailTab.detailTabName.flinkSql')" v-if="app.jobType === JobTypeEnum.SQL" > @@ -472,7 +507,7 @@ </BasicTable> </TabPane> <TabPane - key="4" + key="5" :tab="t('flink.app.detail.detailTab.detailTabName.savepoint')" v-if="tabConf.showSaveOption" > @@ -500,7 +535,7 @@ </BasicTable> </TabPane> <TabPane - key="5" + key="6" :tab="t('flink.app.detail.detailTab.detailTabName.backup')" v-if="tabConf.showBackup" > @@ -517,45 +552,6 @@ </template> </BasicTable> </TabPane> - <TabPane - key="6" - :tab="t('flink.app.detail.detailTab.detailTabName.operationLog')" - v-if="tabConf.showOptionLog" - > - <BasicTable @register="registerLogsTable"> - <template #bodyCell="{ column, record }"> - <template v-if="column.dataIndex === 'optionName'"> - <Tag color="blue" v-if="record.optionName === OperationEnum.RELEASE"> Release </Tag> - <Tag color="green" v-if="record.optionName === OperationEnum.START"> Start </Tag> - <Tag color="cyan" v-if="record.optionName === OperationEnum.SAVEPOINT"> - Savepoint - </Tag> - <Tag color="orange" v-if="record.optionName === OperationEnum.CANCEL"> Cancel </Tag> - </template> - <template v-if="column.dataIndex === 'yarnAppId'"> - <a type="link" @click="handleYarnUrl(record.yarnAppId)" target="_blank"> - {{ record.yarnAppId }} - </a> - </template> - <template v-if="column.dataIndex === 'jobManagerUrl'"> - <a type="link" :href="record.jobManagerUrl" target="_blank"> - {{ record.jobManagerUrl }} - </a> - </template> - <template v-if="column.dataIndex === 'optionTime'"> - <Icon icon="ant-design:clock-circle-outlined" /> - {{ record.optionTime }} - </template> - <template v-if="column.dataIndex === 'success'"> - <Tag class="bold-tag" color="#52c41a" v-if="record.success"> SUCCESS </Tag> - <Tag class="bold-tag" color="#f5222d" v-else> FAILED </Tag> - </template> - <template v-if="column.dataIndex === 'operation'"> - <TableAction :actions="getOperationLogAction(record)" /> - </template> - </template> - </BasicTable> - </TabPane> </Tabs> <CompareModal @register="registerCompare" /> diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue index 205b6173f..88632acbb 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue @@ -59,47 +59,36 @@ schemas: [ { field: 'startSavePointed', - label: 'from savepoint', + label: t('flink.app.view.fromSavepoint'), component: 'Switch', componentProps: { checkedChildren: 'ON', unCheckedChildren: 'OFF', }, defaultValue: true, - afterItem: () => - h( - 'span', - { class: 'conf-switch' }, - 'restore the application from savepoint or latest checkpoint', - ), + afterItem: () => h('span', { class: 'conf-switch' }, t('flink.app.view.savepointTip')), }, { field: 'startSavePoint', - label: 'savepoint', + label: 'Savepoint', component: receiveData.historySavePoint && receiveData.historySavePoint.length > 0 ? 'Select' : 'Input', - afterItem: () => - h( - 'span', - { class: 'conf-switch' }, - 'restore the application from savepoint or latest checkpoint', - ), + afterItem: () => h('span', { class: 'conf-switch' }, t('flink.app.view.savepointInput')), slot: 'savepoint', ifShow: ({ values }) => values.startSavePointed, required: true, }, { field: 'allowNonRestoredState', - label: 'ignore restored', + label: t('flink.app.view.ignoreRestored'), component: 'Switch', componentProps: { checkedChildren: 'ON', unCheckedChildren: 'OFF', }, - afterItem: () => - h('span', { class: 'conf-switch' }, 'ignore savepoint then cannot be restored'), + afterItem: () => h('span', { class: 'conf-switch' }, t('flink.app.view.ignoreRestoredTip')), defaultValue: false, ifShow: ({ values }) => values.startSavePointed, }, diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala index 4dac87f53..c8431a7dd 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala @@ -19,6 +19,7 @@ package org.apache.streampark.flink.kubernetes.watcher import org.apache.streampark.common.util.Logger import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkK8sWatchController, KubernetesRetriever, MetricWatcherConfig} +import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode import org.apache.streampark.flink.kubernetes.event.FlinkClusterMetricChangeEvent import org.apache.streampark.flink.kubernetes.model.{ClusterKey, FlinkMetricCV, TrackId} @@ -81,25 +82,27 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig = MetricWatcherConfig.default ) // retrieve flink metrics in thread pool val futures: Set[Future[Option[FlinkMetricCV]]] = - trackIds.map( - id => { - val future = Future(collectMetrics(id)) - future.onComplete(_.getOrElse(None) match { - case Some(metric) => - val clusterKey = id.toClusterKey - // update current flink cluster metrics on cache - watchController.flinkMetrics.put(clusterKey, metric) - val isMetricChanged = { - val preMetric = watchController.flinkMetrics.get(clusterKey) - preMetric == null || !preMetric.equalsPayload(metric) - } - if (isMetricChanged) { - eventBus.postAsync(FlinkClusterMetricChangeEvent(id, metric)) - } - case _ => + trackIds + .filter(_.executeMode == FlinkK8sExecuteMode.SESSION) + .map( + id => { + val future = Future(collectMetrics(id)) + future.onComplete(_.getOrElse(None) match { + case Some(metric) => + val clusterKey = id.toClusterKey + // update current flink cluster metrics on cache + watchController.flinkMetrics.put(clusterKey, metric) + val isMetricChanged = { + val preMetric = watchController.flinkMetrics.get(clusterKey) + preMetric == null || !preMetric.equalsPayload(metric) + } + if (isMetricChanged) { + eventBus.postAsync(FlinkClusterMetricChangeEvent(id, metric)) + } + case _ => + }) + future }) - future - }) // blocking until all future are completed or timeout is reached Try(Await.ready(Future.sequence(futures), conf.requestTimeoutSec seconds)).failed.map { _ =>
