This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new d83a2939a [Feature][ISSUE-2780] support set RestoreMode when starting
app (#2782)
d83a2939a is described below
commit d83a2939ae3b059a9e816f65bba1d51585d90c36
Author: zhoulii <[email protected]>
AuthorDate: Wed May 31 23:35:03 2023 +0800
[Feature][ISSUE-2780] support set RestoreMode when starting app (#2782)
* [Feature] support set RestoreMode when starting app
* [FIX] check flink version in fe&be
---------
Co-authored-by: zhoulii <[email protected]>
---
.../streampark/common/enums/RestoreMode.java | 48 ++++++++++++++++++++++
.../streampark/common/conf/FlinkVersion.scala | 7 ++++
.../console/core/entity/Application.java | 2 +-
.../core/service/impl/ApplicationServiceImpl.java | 2 +
.../src/enums/flinkEnum.ts | 6 +++
.../components/AppView/StartApplicationModal.vue | 41 +++++++++++++++++-
.../flink/client/bean/SubmitRequest.scala | 1 +
.../flink/client/trait/FlinkClientTrait.scala | 8 +++-
8 files changed, 112 insertions(+), 3 deletions(-)
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/enums/RestoreMode.java
b/streampark-common/src/main/java/org/apache/streampark/common/enums/RestoreMode.java
new file mode 100644
index 000000000..75ecf5b4d
--- /dev/null
+++
b/streampark-common/src/main/java/org/apache/streampark/common/enums/RestoreMode.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.common.enums;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+public enum RestoreMode implements Serializable {
+ CLAIM(1),
+ NO_CLAIM(2),
+ LEGACY(3);
+
+ public static final String RESTORE_MODE = "execution.savepoint-restore-mode";
+ public static final int SINCE_FLINK_VERSION = 15;
+
+ private final int value;
+
+ public int get() {
+ return this.value;
+ }
+
+ RestoreMode(int value) {
+ this.value = value;
+ }
+
+ public String getName() {
+ return RestoreMode.of(this.value).toString();
+ }
+
+ public static RestoreMode of(Integer value) {
+ return Arrays.stream(values()).filter((x) -> x.value ==
value).findFirst().orElse(null);
+ }
+}
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
index e2182c650..5c23b4772 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
@@ -125,6 +125,13 @@ class FlinkVersion(val flinkHome: String) extends
java.io.Serializable with Logg
}
}
+ def checkVersion(sinceVersion: Int): Boolean = {
+ version.split("\\.").map(_.trim.toInt) match {
+ case Array(1, v, _) if v >= sinceVersion => true
+ case _ => false
+ }
+ }
+
// StreamPark flink shims version, like "streampark-flink-shims_flink-1.13"
lazy val shimsVersion: String = s"streampark-flink-shims_flink-$majorVersion"
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 3c352a5f9..4fc0cf147 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -231,8 +231,8 @@ public class Application implements Serializable {
private transient String savePoint;
private transient Boolean savePointed = false;
private transient Boolean drain = false;
- private transient Long savePointTimeout = 60L;
private transient Boolean allowNonRestored = false;
+ private transient Integer restoreMode;
private transient String socketId;
private transient String projectName;
private transient String createTimeFrom;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index f74650e40..73e76e6e9 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -22,6 +22,7 @@ import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.DevelopmentMode;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.enums.ResolveOrder;
+import org.apache.streampark.common.enums.RestoreMode;
import org.apache.streampark.common.enums.StorageType;
import org.apache.streampark.common.fs.HdfsOperator;
import org.apache.streampark.common.fs.LfsOperator;
@@ -1551,6 +1552,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
appConf,
application.getApplicationType(),
getSavePointed(appParam),
+ appParam.getRestoreMode() == null ? null :
RestoreMode.of(appParam.getRestoreMode()),
applicationArgs,
buildResult,
kubernetesSubmitParam,
diff --git
a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
index b664e601c..2c485c592 100644
--- a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
+++ b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
@@ -228,3 +228,9 @@ export enum FailoverStrategyEnum {
ALERT = 1,
RESTART = 2,
}
+
+export enum RestoreModeEnum {
+ CLAIM = 1,
+ NO_CLAIM = 2,
+ LEGACY = 3,
+}
diff --git
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
index ad2db10d4..fec877c32 100644
---
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
+++
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
@@ -24,7 +24,7 @@
});
</script>
<script setup lang="ts" name="StartApplicationModal">
- import { h } from 'vue';
+ import { h, onMounted, ref, unref } from 'vue';
import { Select, Input, Tag } from 'ant-design-vue';
import { BasicForm, useForm } from '/@/components/Form';
import { SvgIcon, Icon } from '/@/components/Icon';
@@ -32,6 +32,9 @@
import { useMessage } from '/@/hooks/web/useMessage';
import { useRouter } from 'vue-router';
import { fetchStart } from '/@/api/flink/app/app';
+ import { RestoreModeEnum } from '/@/enums/flinkEnum';
+ import { fetchFlinkEnv } from '/@/api/flink/setting/flinkEnv';
+ import { FlinkEnv } from '/@/api/flink/setting/types/flinkEnv.type';
const SelectOption = Select.Option;
@@ -42,6 +45,8 @@
const emits = defineEmits(['register', 'updateOption']);
const receiveData = reactive<Recordable>({});
+ const flinkEnvs = ref<FlinkEnv[]>([]);
+
const [registerModal, { closeModal }] = useModalInner((data) => {
if (data) {
Object.assign(receiveData, data);
@@ -89,6 +94,27 @@
ifShow: ({ values }) => values.startSavePointed,
required: true,
},
+ {
+ field: 'restoreMode',
+ label: 'restore mode',
+ component: 'Select',
+ defaultValue: RestoreModeEnum.NO_CLAIM,
+ componentProps: {
+ options: [
+ { label: 'CLAIM', value: RestoreModeEnum.CLAIM },
+ { label: 'NO_CLAIM', value: RestoreModeEnum.NO_CLAIM },
+ { label: 'LEGACY', value: RestoreModeEnum.LEGACY },
+ ],
+ },
+ afterItem: () =>
+ h(
+ 'span',
+ { class: 'conf-switch' },
+ 'restore mode is supported since flink 1.15, usually, you do not
have to set this parameter',
+ ),
+ ifShow: ({ values }) =>
+ values.startSavePointed &&
checkFlinkVersion(receiveData.application.versionId),
+ },
{
field: 'allowNonRestoredState',
label: 'ignore restored',
@@ -116,8 +142,10 @@
const formValue = (await validate()) as Recordable;
const savePointed = formValue.startSavePointed;
const savePointPath = savePointed ? formValue['startSavePoint'] : null;
+ const restoreMode = savePointed ? formValue['restoreMode'] : null;
const { data } = await fetchStart({
id: receiveData.application.id,
+ restoreMode,
savePointed,
savePoint: savePointPath,
allowNonRestored: formValue.allowNonRestoredState || false,
@@ -162,6 +190,17 @@
console.error(error);
}
}
+
+ function checkFlinkVersion(versionId: string) {
+ let env = unref(flinkEnvs).filter((env) => env.id == versionId)[0];
+ return parseInt(env.version.split('.')[1]) >= 15;
+ }
+
+ onMounted(() => {
+ fetchFlinkEnv().then((res) => {
+ flinkEnvs.value = res;
+ });
+ });
</script>
<template>
<BasicModal
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 1d7725fbb..2464e26f1 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -61,6 +61,7 @@ case class SubmitRequest(
appConf: String,
applicationType: ApplicationType,
savePoint: String,
+ restoreMode: RestoreMode,
args: String,
@Nullable buildResult: BuildResult,
@Nullable k8sSubmitParam: KubernetesSubmitParam,
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 325bc9ff3..64c67098e 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -19,7 +19,7 @@ package org.apache.streampark.flink.client.`trait`
import org.apache.streampark.common.conf.ConfigConst._
import org.apache.streampark.common.conf.Workspace
-import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode,
ExecutionMode}
+import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode,
ExecutionMode, RestoreMode}
import org.apache.streampark.common.util.{DeflaterUtils, Logger}
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.core.FlinkClusterClient
@@ -115,6 +115,12 @@ trait FlinkClientTrait extends Logger {
flinkConfig.setBoolean(
SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
submitRequest.allowNonRestoredState)
+ if (
+ submitRequest.flinkVersion.checkVersion(
+ RestoreMode.SINCE_FLINK_VERSION) && submitRequest.restoreMode != null
+ ) {
+ flinkConfig.setString(RestoreMode.RESTORE_MODE,
submitRequest.restoreMode.getName);
+ }
}
// set JVMOptions..