This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
new b792a186c [Improve]doris-connector typo improvement (#3516)
b792a186c is described below
commit b792a186c0f66a5e41147415d8dfc19b009b3378
Author: benjobs <[email protected]>
AuthorDate: Sun Jan 28 12:32:50 2024 +0800
[Improve]doris-connector typo improvement (#3516)
* [Improve] FE i18n improvement
* [Improve] doris-connector typo improvement
---------
Co-authored-by: benjobs <[email protected]>
---
.../core/service/impl/ApplicationServiceImpl.java | 1 +
.../core/task/FlinkK8sChangeEventListener.java | 5 +-
.../src/locales/lang/en/flink/app.ts | 6 ++
.../src/locales/lang/zh-CN/flink/app.ts | 5 ++
.../flink/app/components/AppDetail/DetailTab.vue | 96 +++++++++++-----------
.../components/AppView/StartApplicationModal.vue | 23 ++----
.../connector/doris/internal/DorisSinkWriter.java | 2 +-
.../connector/doris/conf/DorisConfig.scala | 2 +-
.../doris/conf/DorisSinkConfigOption.scala | 4 +-
.../kubernetes/watcher/FlinkMetricsWatcher.scala | 39 +++++----
10 files changed, 92 insertions(+), 91 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index f15a48d22..6c1fc29da 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -247,6 +247,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
if (!teamId.equals(app.getTeamId())) {
continue;
}
+ // 1) only yarn-application, yarn-perjob mode
if (app.getJmMemory() != null) {
totalJmMemory += app.getJmMemory();
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
index a9e9962f9..9cc077718 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
@@ -84,15 +84,16 @@ public class FlinkK8sChangeEventListener {
TrackId trackId = event.trackId();
// get pre application record
Application app = applicationService.getById(trackId.appId());
- if (app == null) {
+ if (app == null || FlinkAppState.isEndState(app.getState())) {
return;
}
+
// update application record
setByJobStatusCV(app, jobStatus);
applicationService.persistMetrics(app);
- // email alerts when necessary
FlinkAppState state = FlinkAppState.of(app.getState());
+ // email alerts when necessary
if (FlinkAppState.FAILED.equals(state)
|| FlinkAppState.LOST.equals(state)
|| FlinkAppState.RESTARTING.equals(state)
diff --git
a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
index 366fef9d7..cd5a17f8e 100644
---
a/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
+++
b/streampark-console/streampark-console-webapp/src/locales/lang/en/flink/app.ts
@@ -166,6 +166,12 @@ export default {
start: 'Start Application',
stop: 'Stop application',
savepoint: 'Trigger Savepoint',
+ fromSavepoint: 'From savepoint',
+ savepointTip: 'Restore the job from savepoint or latest checkpoint',
+ savepointInput:
+ 'Select or manually specify the savepoint/checkpoint path, Same
as:-allowNonRestoredState(-n)',
+ ignoreRestored: 'Ignore failed',
+ ignoreRestoredTip: 'ignore savepoint then cannot be restored,',
recheck: 'the associated project has changed and this job need to be
rechecked',
changed: 'the application has changed.',
},
diff --git
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
index 863a909b7..d1f6b79f8 100644
---
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
+++
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/flink/app.ts
@@ -158,6 +158,11 @@ export default {
savepoint: '触发 Savepoint',
recheck: '关联的项目已更改,需要重新检查此作业',
changed: '应用程序已更改。',
+ fromSavepoint: 'Savepoint 恢复',
+ savepointTip: '作业从 savepoint 或 checkpoint 恢复状态',
+ savepointInput: '选择或者手动指定 savepoint/checkpoint 路径',
+ ignoreRestored: '跳过恢复失败',
+ ignoreRestoredTip: '当状态恢复失败时跳过错误,作业继续运行, 同参数:-allowNonRestoredState(-n)',
},
pod: {
choice: '选择',
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue
index 0d7fa96a2..dc8853e5c 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppDetail/DetailTab.vue
@@ -246,13 +246,13 @@
/* delete configuration */
async function handleDeleteConf(record: Recordable) {
await fetchRemoveConf({ id: record.id });
- reloadConf();
+ await reloadConf();
}
/* delete flink sql */
async function handleDeleteFlinkSql(record: Recordable) {
await fetchRemoveFlinkSql({ id: record.id });
- reloadFlinkSql();
+ await reloadFlinkSql();
}
function handleCompare(record: Recordable) {
@@ -345,17 +345,17 @@
/* delete savePoint */
async function handleDeleteSavePoint(record: Recordable) {
await fetchRemoveSavePoint({ id: record.id });
- reloadSavePoint();
+ await reloadSavePoint();
}
async function handleDeleteBackup(record: Recordable) {
await fetchRemoveBackup(record.id);
- reloadBackup();
+ await reloadBackup();
}
async function handleDeleteOperationLog(record: Recordable) {
await fetchDeleteOperationLog(record.id);
- reloadOperationLog();
+ await reloadOperationLog();
}
/* copy path */
@@ -377,8 +377,43 @@
</script>
<template>
<div>
- <Tabs :defaultActiveKey="1" class="mt-15px" :animated="false"
:tab-bar-gutter="0">
- <TabPane key="1" tab="Option" force-render>
+ <Tabs :defaultActiveKey="1" class="mt-15px" :tab-bar-gutter="0">
+ <TabPane key="1"
:tab="t('flink.app.detail.detailTab.detailTabName.operationLog')">
+ <BasicTable @register="registerLogsTable">
+ <template #bodyCell="{ column, record }">
+ <template v-if="column.dataIndex === 'optionName'">
+ <Tag color="blue" v-if="record.optionName ===
OperationEnum.RELEASE"> Release </Tag>
+ <Tag color="green" v-if="record.optionName ===
OperationEnum.START"> Start </Tag>
+ <Tag color="cyan" v-if="record.optionName ===
OperationEnum.SAVEPOINT">
+ Savepoint
+ </Tag>
+ <Tag color="orange" v-if="record.optionName ===
OperationEnum.CANCEL"> Cancel </Tag>
+ </template>
+ <template v-if="column.dataIndex === 'yarnAppId'">
+ <a type="link" @click="handleYarnUrl(record.yarnAppId)"
target="_blank">
+ {{ record.yarnAppId }}
+ </a>
+ </template>
+ <template v-if="column.dataIndex === 'jobManagerUrl'">
+ <a type="link" :href="record.jobManagerUrl" target="_blank">
+ {{ record.jobManagerUrl }}
+ </a>
+ </template>
+ <template v-if="column.dataIndex === 'optionTime'">
+ <Icon icon="ant-design:clock-circle-outlined" />
+ {{ record.optionTime }}
+ </template>
+ <template v-if="column.dataIndex === 'success'">
+ <Tag class="bold-tag" color="#52c41a" v-if="record.success">
SUCCESS </Tag>
+ <Tag class="bold-tag" color="#f5222d" v-else> FAILED </Tag>
+ </template>
+ <template v-if="column.dataIndex === 'operation'">
+ <TableAction :actions="getOperationLogAction(record)" />
+ </template>
+ </template>
+ </BasicTable>
+ </TabPane>
+ <TabPane key="2"
:tab="t('flink.app.detail.detailTab.detailTabName.option')">
<Descriptions bordered size="middle" layout="vertical">
<DescriptionItem v-for="(v, k) in JSON.parse(app.options || '{}')"
:key="k" :label="k">
{{ v }}
@@ -386,7 +421,7 @@
</Descriptions>
</TabPane>
<TabPane
- key="2"
+ key="3"
:tab="t('flink.app.detail.detailTab.detailTabName.configuration')"
v-if="app && app.appType === AppTypeEnum.STREAMPARK_FLINK &&
tabConf.showConf"
>
@@ -424,7 +459,7 @@
</BasicTable>
</TabPane>
<TabPane
- key="3"
+ key="4"
:tab="t('flink.app.detail.detailTab.detailTabName.flinkSql')"
v-if="app.jobType === JobTypeEnum.SQL"
>
@@ -472,7 +507,7 @@
</BasicTable>
</TabPane>
<TabPane
- key="4"
+ key="5"
:tab="t('flink.app.detail.detailTab.detailTabName.savepoint')"
v-if="tabConf.showSaveOption"
>
@@ -500,7 +535,7 @@
</BasicTable>
</TabPane>
<TabPane
- key="5"
+ key="6"
:tab="t('flink.app.detail.detailTab.detailTabName.backup')"
v-if="tabConf.showBackup"
>
@@ -517,45 +552,6 @@
</template>
</BasicTable>
</TabPane>
- <TabPane
- key="6"
- :tab="t('flink.app.detail.detailTab.detailTabName.operationLog')"
- v-if="tabConf.showOptionLog"
- >
- <BasicTable @register="registerLogsTable">
- <template #bodyCell="{ column, record }">
- <template v-if="column.dataIndex === 'optionName'">
- <Tag color="blue" v-if="record.optionName ===
OperationEnum.RELEASE"> Release </Tag>
- <Tag color="green" v-if="record.optionName ===
OperationEnum.START"> Start </Tag>
- <Tag color="cyan" v-if="record.optionName ===
OperationEnum.SAVEPOINT">
- Savepoint
- </Tag>
- <Tag color="orange" v-if="record.optionName ===
OperationEnum.CANCEL"> Cancel </Tag>
- </template>
- <template v-if="column.dataIndex === 'yarnAppId'">
- <a type="link" @click="handleYarnUrl(record.yarnAppId)"
target="_blank">
- {{ record.yarnAppId }}
- </a>
- </template>
- <template v-if="column.dataIndex === 'jobManagerUrl'">
- <a type="link" :href="record.jobManagerUrl" target="_blank">
- {{ record.jobManagerUrl }}
- </a>
- </template>
- <template v-if="column.dataIndex === 'optionTime'">
- <Icon icon="ant-design:clock-circle-outlined" />
- {{ record.optionTime }}
- </template>
- <template v-if="column.dataIndex === 'success'">
- <Tag class="bold-tag" color="#52c41a" v-if="record.success">
SUCCESS </Tag>
- <Tag class="bold-tag" color="#f5222d" v-else> FAILED </Tag>
- </template>
- <template v-if="column.dataIndex === 'operation'">
- <TableAction :actions="getOperationLogAction(record)" />
- </template>
- </template>
- </BasicTable>
- </TabPane>
</Tabs>
<CompareModal @register="registerCompare" />
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
index 205b6173f..88632acbb 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
@@ -59,47 +59,36 @@
schemas: [
{
field: 'startSavePointed',
- label: 'from savepoint',
+ label: t('flink.app.view.fromSavepoint'),
component: 'Switch',
componentProps: {
checkedChildren: 'ON',
unCheckedChildren: 'OFF',
},
defaultValue: true,
- afterItem: () =>
- h(
- 'span',
- { class: 'conf-switch' },
- 'restore the application from savepoint or latest checkpoint',
- ),
+ afterItem: () => h('span', { class: 'conf-switch' },
t('flink.app.view.savepointTip')),
},
{
field: 'startSavePoint',
- label: 'savepoint',
+ label: 'Savepoint',
component:
receiveData.historySavePoint && receiveData.historySavePoint.length
> 0
? 'Select'
: 'Input',
- afterItem: () =>
- h(
- 'span',
- { class: 'conf-switch' },
- 'restore the application from savepoint or latest checkpoint',
- ),
+ afterItem: () => h('span', { class: 'conf-switch' },
t('flink.app.view.savepointInput')),
slot: 'savepoint',
ifShow: ({ values }) => values.startSavePointed,
required: true,
},
{
field: 'allowNonRestoredState',
- label: 'ignore restored',
+ label: t('flink.app.view.ignoreRestored'),
component: 'Switch',
componentProps: {
checkedChildren: 'ON',
unCheckedChildren: 'OFF',
},
- afterItem: () =>
- h('span', { class: 'conf-switch' }, 'ignore savepoint then cannot be
restored'),
+ afterItem: () => h('span', { class: 'conf-switch' },
t('flink.app.view.ignoreRestoredTip')),
defaultValue: false,
ifShow: ({ values }) => values.startSavePointed,
},
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java
index a6f0d5317..160b32047 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java
@@ -166,7 +166,7 @@ public class DorisSinkWriter implements Serializable {
final String bufferKey = String.format("%s.%s", database, table);
final DorisSinkBufferEntry bufferEntity =
bufferMap.computeIfAbsent(
- bufferKey, k -> new DorisSinkBufferEntry(database, table,
dorisConfig.lablePrefix()));
+ bufferKey, k -> new DorisSinkBufferEntry(database, table,
dorisConfig.labelPrefix()));
for (String record : records) {
byte[] bts = record.getBytes(StandardCharsets.UTF_8);
bufferEntity.addToBuffer(bts);
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala
index a38f2cee3..f031282a6 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala
@@ -58,7 +58,7 @@ class DorisConfig(parameters: Properties) {
val sinkOfferTimeout: Long = sinkOption.sinkOfferTimeout.get()
- val lablePrefix: String = sinkOption.lablePrefix.get()
+ val labelPrefix: String = sinkOption.labelPrefix.get()
val semantic: String = sinkOption.semantic.get()
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisSinkConfigOption.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisSinkConfigOption.scala
index a4940df84..6ec700b32 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisSinkConfigOption.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisSinkConfigOption.scala
@@ -126,8 +126,8 @@ class DorisSinkConfigOption(prefixStr: String, properties:
Properties) extends S
val maxRetries: ConfigOption[Int] =
ConfigOption(key = "maxRetries", required = false, defaultValue = 1,
classType = classOf[Int])
- val lablePrefix: ConfigOption[String] = ConfigOption(
- key = "lablePrefix",
+ val labelPrefix: ConfigOption[String] = ConfigOption(
+ key = "labelPrefix",
required = false,
defaultValue = "doris",
classType = classOf[String])
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
index 4dac87f53..c8431a7dd 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
@@ -19,6 +19,7 @@ package org.apache.streampark.flink.kubernetes.watcher
import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.kubernetes.{ChangeEventBus,
FlinkK8sWatchController, KubernetesRetriever, MetricWatcherConfig}
+import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
import
org.apache.streampark.flink.kubernetes.event.FlinkClusterMetricChangeEvent
import org.apache.streampark.flink.kubernetes.model.{ClusterKey,
FlinkMetricCV, TrackId}
@@ -81,25 +82,27 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig =
MetricWatcherConfig.default
)
// retrieve flink metrics in thread pool
val futures: Set[Future[Option[FlinkMetricCV]]] =
- trackIds.map(
- id => {
- val future = Future(collectMetrics(id))
- future.onComplete(_.getOrElse(None) match {
- case Some(metric) =>
- val clusterKey = id.toClusterKey
- // update current flink cluster metrics on cache
- watchController.flinkMetrics.put(clusterKey, metric)
- val isMetricChanged = {
- val preMetric = watchController.flinkMetrics.get(clusterKey)
- preMetric == null || !preMetric.equalsPayload(metric)
- }
- if (isMetricChanged) {
- eventBus.postAsync(FlinkClusterMetricChangeEvent(id, metric))
- }
- case _ =>
+ trackIds
+ .filter(_.executeMode == FlinkK8sExecuteMode.SESSION)
+ .map(
+ id => {
+ val future = Future(collectMetrics(id))
+ future.onComplete(_.getOrElse(None) match {
+ case Some(metric) =>
+ val clusterKey = id.toClusterKey
+ // update current flink cluster metrics on cache
+ watchController.flinkMetrics.put(clusterKey, metric)
+ val isMetricChanged = {
+ val preMetric = watchController.flinkMetrics.get(clusterKey)
+ preMetric == null || !preMetric.equalsPayload(metric)
+ }
+ if (isMetricChanged) {
+ eventBus.postAsync(FlinkClusterMetricChangeEvent(id, metric))
+ }
+ case _ =>
+ })
+ future
})
- future
- })
// blocking until all future are completed or timeout is reached
Try(Await.ready(Future.sequence(futures), conf.requestTimeoutSec
seconds)).failed.map {
_ =>