This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new d83a2939a [Feature][ISSUE-2780] support set RestoreMode when starting 
app (#2782)
d83a2939a is described below

commit d83a2939ae3b059a9e816f65bba1d51585d90c36
Author: zhoulii <[email protected]>
AuthorDate: Wed May 31 23:35:03 2023 +0800

    [Feature][ISSUE-2780] support set RestoreMode when starting app (#2782)
    
    * [Feature] support set RestoreMode when starting app
    
    * [FIX] check flink version in fe&be
    
    ---------
    
    Co-authored-by: zhoulii <[email protected]>
---
 .../streampark/common/enums/RestoreMode.java       | 48 ++++++++++++++++++++++
 .../streampark/common/conf/FlinkVersion.scala      |  7 ++++
 .../console/core/entity/Application.java           |  2 +-
 .../core/service/impl/ApplicationServiceImpl.java  |  2 +
 .../src/enums/flinkEnum.ts                         |  6 +++
 .../components/AppView/StartApplicationModal.vue   | 41 +++++++++++++++++-
 .../flink/client/bean/SubmitRequest.scala          |  1 +
 .../flink/client/trait/FlinkClientTrait.scala      |  8 +++-
 8 files changed, 112 insertions(+), 3 deletions(-)

diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/RestoreMode.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/RestoreMode.java
new file mode 100644
index 000000000..75ecf5b4d
--- /dev/null
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/RestoreMode.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.common.enums;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+public enum RestoreMode implements Serializable {
+  CLAIM(1),
+  NO_CLAIM(2),
+  LEGACY(3);
+
+  public static final String RESTORE_MODE = "execution.savepoint-restore-mode";
+  public static final int SINCE_FLINK_VERSION = 15;
+
+  private final int value;
+
+  public int get() {
+    return this.value;
+  }
+
+  RestoreMode(int value) {
+    this.value = value;
+  }
+
+  public String getName() {
+    return RestoreMode.of(this.value).toString();
+  }
+
+  public static RestoreMode of(Integer value) {
+    return Arrays.stream(values()).filter((x) -> x.value == 
value).findFirst().orElse(null);
+  }
+}
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
index e2182c650..5c23b4772 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
@@ -125,6 +125,13 @@ class FlinkVersion(val flinkHome: String) extends 
java.io.Serializable with Logg
     }
   }
 
+  def checkVersion(sinceVersion: Int): Boolean = {
+    version.split("\\.").map(_.trim.toInt) match {
+      case Array(1, v, _) if v >= sinceVersion => true
+      case _ => false
+    }
+  }
+
   // StreamPark flink shims version, like "streampark-flink-shims_flink-1.13"
   lazy val shimsVersion: String = s"streampark-flink-shims_flink-$majorVersion"
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 3c352a5f9..4fc0cf147 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -231,8 +231,8 @@ public class Application implements Serializable {
   private transient String savePoint;
   private transient Boolean savePointed = false;
   private transient Boolean drain = false;
-  private transient Long savePointTimeout = 60L;
   private transient Boolean allowNonRestored = false;
+  private transient Integer restoreMode;
   private transient String socketId;
   private transient String projectName;
   private transient String createTimeFrom;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index f74650e40..73e76e6e9 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -22,6 +22,7 @@ import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.enums.DevelopmentMode;
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.enums.ResolveOrder;
+import org.apache.streampark.common.enums.RestoreMode;
 import org.apache.streampark.common.enums.StorageType;
 import org.apache.streampark.common.fs.HdfsOperator;
 import org.apache.streampark.common.fs.LfsOperator;
@@ -1551,6 +1552,7 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
             appConf,
             application.getApplicationType(),
             getSavePointed(appParam),
+            appParam.getRestoreMode() == null ? null : 
RestoreMode.of(appParam.getRestoreMode()),
             applicationArgs,
             buildResult,
             kubernetesSubmitParam,
diff --git 
a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts 
b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
index b664e601c..2c485c592 100644
--- a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
+++ b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts
@@ -228,3 +228,9 @@ export enum FailoverStrategyEnum {
   ALERT = 1,
   RESTART = 2,
 }
+
+export enum RestoreModeEnum {
+  CLAIM = 1,
+  NO_CLAIM = 2,
+  LEGACY = 3,
+}
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
 
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
index ad2db10d4..fec877c32 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/components/AppView/StartApplicationModal.vue
@@ -24,7 +24,7 @@
   });
 </script>
 <script setup lang="ts" name="StartApplicationModal">
-  import { h } from 'vue';
+  import { h, onMounted, ref, unref } from 'vue';
   import { Select, Input, Tag } from 'ant-design-vue';
   import { BasicForm, useForm } from '/@/components/Form';
   import { SvgIcon, Icon } from '/@/components/Icon';
@@ -32,6 +32,9 @@
   import { useMessage } from '/@/hooks/web/useMessage';
   import { useRouter } from 'vue-router';
   import { fetchStart } from '/@/api/flink/app/app';
+  import { RestoreModeEnum } from '/@/enums/flinkEnum';
+  import { fetchFlinkEnv } from '/@/api/flink/setting/flinkEnv';
+  import { FlinkEnv } from '/@/api/flink/setting/types/flinkEnv.type';
 
   const SelectOption = Select.Option;
 
@@ -42,6 +45,8 @@
   const emits = defineEmits(['register', 'updateOption']);
   const receiveData = reactive<Recordable>({});
 
+  const flinkEnvs = ref<FlinkEnv[]>([]);
+
   const [registerModal, { closeModal }] = useModalInner((data) => {
     if (data) {
       Object.assign(receiveData, data);
@@ -89,6 +94,27 @@
         ifShow: ({ values }) => values.startSavePointed,
         required: true,
       },
+      {
+        field: 'restoreMode',
+        label: 'restore mode',
+        component: 'Select',
+        defaultValue: RestoreModeEnum.NO_CLAIM,
+        componentProps: {
+          options: [
+            { label: 'CLAIM', value: RestoreModeEnum.CLAIM },
+            { label: 'NO_CLAIM', value: RestoreModeEnum.NO_CLAIM },
+            { label: 'LEGACY', value: RestoreModeEnum.LEGACY },
+          ],
+        },
+        afterItem: () =>
+          h(
+            'span',
+            { class: 'conf-switch' },
+            'restore mode is supported since flink 1.15, usually, you do not 
have to set this parameter',
+          ),
+        ifShow: ({ values }) =>
+          values.startSavePointed && 
checkFlinkVersion(receiveData.application.versionId),
+      },
       {
         field: 'allowNonRestoredState',
         label: 'ignore restored',
@@ -116,8 +142,10 @@
       const formValue = (await validate()) as Recordable;
       const savePointed = formValue.startSavePointed;
       const savePointPath = savePointed ? formValue['startSavePoint'] : null;
+      const restoreMode = savePointed ? formValue['restoreMode'] : null;
       const { data } = await fetchStart({
         id: receiveData.application.id,
+        restoreMode,
         savePointed,
         savePoint: savePointPath,
         allowNonRestored: formValue.allowNonRestoredState || false,
@@ -162,6 +190,17 @@
       console.error(error);
     }
   }
+
+  function checkFlinkVersion(versionId: string) {
+    let env = unref(flinkEnvs).filter((env) => env.id == versionId)[0];
+    return parseInt(env.version.split('.')[1]) >= 15;
+  }
+
+  onMounted(() => {
+    fetchFlinkEnv().then((res) => {
+      flinkEnvs.value = res;
+    });
+  });
 </script>
 <template>
   <BasicModal
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 1d7725fbb..2464e26f1 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -61,6 +61,7 @@ case class SubmitRequest(
     appConf: String,
     applicationType: ApplicationType,
     savePoint: String,
+    restoreMode: RestoreMode,
     args: String,
     @Nullable buildResult: BuildResult,
     @Nullable k8sSubmitParam: KubernetesSubmitParam,
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 325bc9ff3..64c67098e 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -19,7 +19,7 @@ package org.apache.streampark.flink.client.`trait`
 
 import org.apache.streampark.common.conf.ConfigConst._
 import org.apache.streampark.common.conf.Workspace
-import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode, 
ExecutionMode}
+import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode, 
ExecutionMode, RestoreMode}
 import org.apache.streampark.common.util.{DeflaterUtils, Logger}
 import org.apache.streampark.flink.client.bean._
 import org.apache.streampark.flink.core.FlinkClusterClient
@@ -115,6 +115,12 @@ trait FlinkClientTrait extends Logger {
       flinkConfig.setBoolean(
         SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
         submitRequest.allowNonRestoredState)
+      if (
+        submitRequest.flinkVersion.checkVersion(
+          RestoreMode.SINCE_FLINK_VERSION) && submitRequest.restoreMode != null
+      ) {
+        flinkConfig.setString(RestoreMode.RESTORE_MODE, 
submitRequest.restoreMode.getName);
+      }
     }
 
     // set JVMOptions..

Reply via email to