This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch issue-3749
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit d16c9fe7aa15fe5bdd59fde9e67112778b465f87
Author: benjobs <[email protected]>
AuthorDate: Sat Jun 15 16:03:16 2024 +0800

    [Improve] flink job restore state from checkpoint bug fixed. #3749
---
 .../core/service/impl/ApplicationServiceImpl.java        |  9 +++++++++
 .../console/core/task/CheckpointProcessor.java           | 16 ++++++++++++++--
 .../streampark-console-webapp/src/api/base/system.ts     |  2 --
 .../src/views/flink/app/Detail.vue                       |  4 +---
 .../src/views/setting/FlinkHome/components/Modal.vue     | 11 +++++++++--
 5 files changed, 33 insertions(+), 9 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index dcdae2437..1028b37be 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -79,6 +79,7 @@ import 
org.apache.streampark.console.core.service.ServiceHelper;
 import org.apache.streampark.console.core.service.SettingService;
 import org.apache.streampark.console.core.service.VariableService;
 import org.apache.streampark.console.core.service.YarnQueueService;
+import org.apache.streampark.console.core.task.CheckpointProcessor;
 import org.apache.streampark.console.core.task.FlinkAppHttpWatcher;
 import org.apache.streampark.console.core.task.FlinkK8sWatcherWrapper;
 import org.apache.streampark.flink.client.FlinkClient;
@@ -214,6 +215,8 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
   @Autowired private FlinkK8sWatcherWrapper k8sWatcherWrapper;
 
+  @Autowired private CheckpointProcessor checkpointProcessor;
+
   private static final int CPU_NUM = Math.max(2, 
Runtime.getRuntime().availableProcessors() * 4);
 
   private final ExecutorService bootstrapExecutor =
@@ -1690,6 +1693,12 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
 
           // 3) success
           applicationLog.setSuccess(true);
+
+          // issue: https://github.com/apache/incubator-streampark/issues/3749
+          if (appParam.getSavePointed() == null || !appParam.getSavePointed()) 
{
+            checkpointProcessor.resetCheckpointNum(appParam.getId());
+          }
+
           if (response.flinkConfig() != null) {
             String jmMemory = 
response.flinkConfig().get(ConfigConst.KEY_FLINK_JM_PROCESS_MEMORY());
             if (jmMemory != null) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
index 8fc19a78b..e08d37102 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/CheckpointProcessor.java
@@ -90,7 +90,7 @@ public class CheckpointProcessor {
         return;
       }
 
-      Long latestChkId = getLatestCheckpointedId(appId, 
checkPointKey.getCheckPointId());
+      Long latestChkId = getLatestCheckpointId(appId, 
checkPointKey.getCheckPointId());
       if (checkSaveAsCheckpoint(checkPoint, latestChkId)) {
         checkPointCache.put(checkPointKey.getCheckPointId(), 
checkPoint.getId());
         saveSavepoint(checkPoint, application.getId());
@@ -131,6 +131,18 @@ public class CheckpointProcessor {
     }
   }
 
+  // issue: https://github.com/apache/incubator-streampark/issues/3749
+  public void resetCheckpointNum(Long appId) {
+    checkPointCache
+        .asMap()
+        .forEach(
+            (k, v) -> {
+              if (k.startsWith(appId.toString())) {
+                checkPointCache.invalidate(k);
+              }
+            });
+  }
+
   private boolean checkSaveAsCheckpoint(@Nonnull CheckPoints.CheckPoint 
checkPoint, Long latestId) {
     return !checkPoint.getIsSavepoint() && (latestId == null || latestId < 
checkPoint.getId());
   }
@@ -148,7 +160,7 @@ public class CheckpointProcessor {
   }
 
   @Nullable
-  private Long getLatestCheckpointedId(Long appId, String cacheId) {
+  private Long getLatestCheckpointId(Long appId, String cacheId) {
     return checkPointCache.get(
         cacheId,
         key -> {
diff --git 
a/streampark-console/streampark-console-webapp/src/api/base/system.ts 
b/streampark-console/streampark-console-webapp/src/api/base/system.ts
index d117f7b57..9a517fddd 100644
--- a/streampark-console/streampark-console-webapp/src/api/base/system.ts
+++ b/streampark-console/streampark-console-webapp/src/api/base/system.ts
@@ -19,10 +19,8 @@ import {
   RolePageParams,
   RolePageListGetResultModel,
   MenuListModel,
-  UserListGetResultModel,
 } from './model/systemModel';
 import { defHttp } from '/@/utils/http/axios';
-import { BasicPageParams } from '/@/api/model/baseModel';
 
 enum Api {
   MenuList = '/menu/list',
diff --git 
a/streampark-console/streampark-console-webapp/src/views/flink/app/Detail.vue 
b/streampark-console/streampark-console-webapp/src/views/flink/app/Detail.vue
index bdf731c80..b949cc1b7 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/flink/app/Detail.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/flink/app/Detail.vue
@@ -29,7 +29,7 @@
   import { Description, useDescription } from '/@/components/Description';
   import { Icon } from '/@/components/Icon';
   import { useRoute, useRouter } from 'vue-router';
-  import { fetchBackUps, fetchGet, fetchOptionLog, fetchYarn } from 
'/@/api/flink/app/app';
+  import { fetchGet, fetchOptionLog, fetchYarn } from '/@/api/flink/app/app';
   import { onUnmounted, reactive, h, unref, ref, onMounted, computed } from 
'vue';
   import { useIntervalFn, useClipboard } from '@vueuse/core';
   import { AppListRecord } from '/@/api/flink/app/app.type';
@@ -178,8 +178,6 @@
     if (confList.records.length > 0) detailTabs.showConf = true;
     if (pointHistory.records.length > 0) detailTabs.showSaveOption = true;
     if (optionList.records.length > 0) detailTabs.showOptionLog = true;
-    //const backupList = await fetchBackUps(commonParams);
-    //if (backupList.records.length > 0) detailTabs.showBackup = true;
   }
 
   /* Get yarn data */
diff --git 
a/streampark-console/streampark-console-webapp/src/views/setting/FlinkHome/components/Modal.vue
 
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkHome/components/Modal.vue
index de0ee072d..7e3c501c7 100644
--- 
a/streampark-console/streampark-console-webapp/src/views/setting/FlinkHome/components/Modal.vue
+++ 
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkHome/components/Modal.vue
@@ -91,8 +91,15 @@
 
   /* form submit */
   async function handleSubmit() {
-    changeOkLoading(true);
-    const formValue = await validate();
+    let formValue;
+    try {
+      formValue = await validate();
+    } catch (error) {
+      console.warn('validate error:', error);
+      return;
+    } finally {
+      changeOkLoading(false);
+    }
     // Detection environment
     const { data: resp } = await fetchCheckEnv({
       id: versionId.value,

Reply via email to